package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.messages.ForceCommitMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.controller.api.resources.PauseStatus;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.StringUtil;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.class */
public class PinotLLCRealtimeSegmentManager {
    public static final String IS_TABLE_PAUSED = "isTablePaused";
    private static final Logger LOGGER;
    private static final int STARTING_SEQUENCE_NUMBER = 0;
    private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
    private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30000;
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300000;
    private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS = 3600000;
    private static final Random RANDOM;
    private final HelixAdmin _helixAdmin;
    private final HelixManager _helixManager;
    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private final PinotHelixResourceManager _helixResourceManager;
    private final String _clusterName;
    private final ControllerConf _controllerConf;
    private final ControllerMetrics _controllerMetrics;
    private final MetadataEventNotifierFactory _metadataEventNotifierFactory;
    private final int _numIdealStateUpdateLocks;
    private final Lock[] _idealStateUpdateLocks;
    private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
    private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
    private final boolean _isTmpSegmentAsyncDeletionEnabled;
    private final int _deepstoreUploadRetryTimeoutMs;
    private final FileUploadDownloadClient _fileUploadDownloadClient;
    private final ExecutorService _deepStoreUploadExecutor;
    private final Set<String> _deepStoreUploadExecutorPendingSegments;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
    private volatile boolean _isStopping = false;

    public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
        this._helixAdmin = pinotHelixResourceManager.getHelixAdmin();
        this._helixManager = pinotHelixResourceManager.getHelixZkManager();
        this._propertyStore = pinotHelixResourceManager.getPropertyStore();
        this._helixResourceManager = pinotHelixResourceManager;
        this._clusterName = pinotHelixResourceManager.getHelixClusterName();
        this._controllerConf = controllerConf;
        this._controllerMetrics = controllerMetrics;
        this._metadataEventNotifierFactory = MetadataEventNotifierFactory.loadFactory(controllerConf.subset(METADATA_EVENT_NOTIFIER_PREFIX), pinotHelixResourceManager);
        this._numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
        this._idealStateUpdateLocks = new Lock[this._numIdealStateUpdateLocks];
        for (int i = 0; i < this._numIdealStateUpdateLocks; i++) {
            this._idealStateUpdateLocks[i] = new ReentrantLock();
        }
        this._flushThresholdUpdateManager = new FlushThresholdUpdateManager();
        this._isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
        this._isTmpSegmentAsyncDeletionEnabled = controllerConf.isTmpSegmentAsyncDeletionEnabled();
        this._deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
        this._fileUploadDownloadClient = this._isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null;
        this._deepStoreUploadExecutor = this._isDeepStoreLLCSegmentUploadRetryEnabled ? Executors.newFixedThreadPool(controllerConf.getDeepStoreRetryUploadParallelism()) : null;
        this._deepStoreUploadExecutorPendingSegments = this._isDeepStoreLLCSegmentUploadRetryEnabled ? ConcurrentHashMap.newKeySet() : null;
    }

    public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
        return this._isDeepStoreLLCSegmentUploadRetryEnabled;
    }

    public boolean isTmpSegmentAsyncDeletionEnabled() {
        return this._isTmpSegmentAsyncDeletionEnabled;
    }

    @VisibleForTesting
    FileUploadDownloadClient initFileUploadDownloadClient() {
        return new FileUploadDownloadClient();
    }

    public List<PartitionGroupConsumptionStatus> getPartitionGroupConsumptionStatusList(IdealState idealState, StreamConfig streamConfig) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Iterator it = idealState.getRecord().getMapFields().keySet().iterator();
        while (it.hasNext()) {
            LLCSegmentName of = LLCSegmentName.of((String) it.next());
            if (of != null) {
                hashMap.compute(Integer.valueOf(of.getPartitionGroupId()), (num, lLCSegmentName) -> {
                    if (lLCSegmentName != null && lLCSegmentName.getSequenceNumber() > of.getSequenceNumber()) {
                        return lLCSegmentName;
                    }
                    return of;
                });
            }
        }
        StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory = StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            LLCSegmentName lLCSegmentName2 = (LLCSegmentName) entry.getValue();
            SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(streamConfig.getTableNameWithType(), lLCSegmentName2.getSegmentName());
            arrayList.add(new PartitionGroupConsumptionStatus(intValue, lLCSegmentName2.getSequenceNumber(), createStreamMsgOffsetFactory.create(segmentZKMetadata.getStartOffset()), segmentZKMetadata.getEndOffset() == null ? null : createStreamMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()), segmentZKMetadata.getStatus().toString()));
        }
        return arrayList;
    }

    public String getControllerVipUrl() {
        return this._controllerConf.generateVipUrl();
    }

    public void stop() {
        this._isStopping = true;
        LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
        long j = 30000;
        while (this._numCompletingSegments.get() > 0 && j > 0) {
            long j2 = 1000;
            if (j < 1000) {
                j2 = j;
            }
            try {
                Thread.sleep(j2);
                j -= j2;
            } catch (InterruptedException e) {
                LOGGER.info("Interrupted: Remaining wait time {} (out of {})", Long.valueOf(j), Long.valueOf(MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS));
                return;
            }
        }
        LOGGER.info("Wait completed: Number of completing segments = {}", Integer.valueOf(this._numCompletingSegments.get()));
        if (this._fileUploadDownloadClient != null) {
            try {
                this._fileUploadDownloadClient.close();
            } catch (IOException e2) {
                LOGGER.error("Failed to close fileUploadDownloadClient.");
            }
        }
    }

    public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableName = tableConfig.getTableName();
        LOGGER.info("Setting up new LLC table: {}", tableName);
        this._flushThresholdUpdateManager.clearFlushThresholdUpdater(tableName);
        StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
        int size = newPartitionGroupMetadataList.size();
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        long currentTimeMs = getCurrentTimeMs();
        Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
        Iterator<PartitionGroupMetadata> it = newPartitionGroupMetadataList.iterator();
        while (it.hasNext()) {
            updateInstanceStatesForNewConsumingSegment(mapFields, null, setupNewPartitionGroup(tableConfig, streamConfig, it.next(), currentTimeMs, consumingInstancePartitions, size, numReplicas), segmentAssignment, singletonMap);
        }
        setIdealState(tableName, idealState);
    }

    @VisibleForTesting
    public TableConfig getTableConfig(String str) {
        try {
            TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
            if (tableConfig == null) {
                throw new IllegalStateException("Failed to find table config for table: " + str);
            }
            return tableConfig;
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
        try {
            return InstancePartitionsUtils.fetchOrComputeInstancePartitions(this._helixManager, tableConfig, InstancePartitionsType.CONSUMING);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getAllSegments(String str) {
        try {
            return ZKMetadataProvider.getSegments(this._propertyStore, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    List<String> getLLCSegments(String str) {
        try {
            return ZKMetadataProvider.getLLCRealtimeSegments(this._propertyStore, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    private SegmentZKMetadata getSegmentZKMetadata(String str, String str2) {
        return getSegmentZKMetadata(str, str2, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public SegmentZKMetadata getSegmentZKMetadata(String str, String str2, @Nullable Stat stat) {
        try {
            ZNRecord zNRecord = (ZNRecord) this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(str, str2), stat, AccessOption.PERSISTENT);
            Preconditions.checkState(zNRecord != null, "Failed to find segment ZK metadata for segment: %s of table: %s", str2, str);
            return new SegmentZKMetadata(zNRecord);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void persistSegmentZKMetadata(String str, SegmentZKMetadata segmentZKMetadata, int i) {
        String segmentName = segmentZKMetadata.getSegmentName();
        LOGGER.info("Persisting segment ZK metadata for segment: {}", segmentName);
        try {
            Preconditions.checkState(this._propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment(str, segmentName), segmentZKMetadata.toZNRecord(), i, AccessOption.PERSISTENT), "Failed to persist segment ZK metadata for segment: %s of table: %s", segmentName, str);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    IdealState getIdealState(String str) {
        try {
            IdealState tableIdealState = HelixHelper.getTableIdealState(this._helixManager, str);
            Preconditions.checkState(tableIdealState != null, "Failed to find IdealState for table: " + str);
            return tableIdealState;
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
            throw e;
        }
    }

    @VisibleForTesting
    void setIdealState(String str, IdealState idealState) {
        try {
            this._helixAdmin.setResourceIdealState(this._clusterName, str, idealState);
        } catch (Exception e) {
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e;
        }
    }

    public void commitSegmentFile(String str, CommittingSegmentDescriptor committingSegmentDescriptor) throws Exception {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Committing segment file for segment: {}", segmentName);
        String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
        Preconditions.checkArgument(segmentLocation != null, "Segment location must be provided");
        if (segmentLocation.regionMatches(true, 0, "peer://", 0, "peer://".length())) {
            LOGGER.info("No moving needed for segment on peer servers: {}", segmentLocation);
            return;
        }
        URI uri = URIUtils.getUri(this._controllerConf.getDataDir(), new String[]{extractRawTableName});
        PinotFS create = PinotFSFactory.create(uri.getScheme());
        String moveSegmentFile = moveSegmentFile(extractRawTableName, segmentName, segmentLocation, create);
        if (!isTmpSegmentAsyncDeletionEnabled()) {
            try {
                for (String str2 : create.listFiles(uri, false)) {
                    if (str2.contains(SegmentCompletionUtils.getTmpSegmentNamePrefix(segmentName))) {
                        LOGGER.warn("Deleting temporary segment file: {}", str2);
                        Preconditions.checkState(create.delete(new URI(str2), true), "Failed to delete file: %s", str2);
                    }
                }
            } catch (Exception e) {
                LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e);
            }
        }
        committingSegmentDescriptor.setSegmentLocation(moveSegmentFile);
    }

    public void commitSegmentMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        try {
            this._numCompletingSegments.addAndGet(1);
            commitSegmentMetadataInternal(str, committingSegmentDescriptor);
        } finally {
            this._numCompletingSegments.addAndGet(-1);
        }
    }

    private void commitSegmentMetadataInternal(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        Set<Integer> set;
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        int partitionGroupId = lLCSegmentName.getPartitionGroupId();
        LOGGER.info("Committing segment metadata for segment: {}", segmentName);
        if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) {
            LOGGER.warn("Committing segment: {} was not uploaded to deep store", segmentName);
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1L);
        }
        TableConfig tableConfig = getTableConfig(str);
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        IdealState idealState = getIdealState(str);
        Preconditions.checkState(idealState.getInstanceStateMap(segmentName).containsValue("CONSUMING"), "Failed to find instance in CONSUMING state in IdealState for segment: %s", segmentName);
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        long nanoTime = System.nanoTime();
        SegmentZKMetadata updateCommittingSegmentZKMetadata = updateCommittingSegmentZKMetadata(str, committingSegmentDescriptor);
        this._helixResourceManager.sendSegmentRefreshMessage(str, segmentName, false, true);
        long nanoTime2 = System.nanoTime();
        String str2 = null;
        if (!isTablePaused(idealState)) {
            StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
            try {
                set = getPartitionIds(streamConfig);
            } catch (Exception e) {
                LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. Reading all partition group metadata to determine partition ids.", str, e.toString());
                set = (Set) getNewPartitionGroupMetadataList(streamConfig, getPartitionGroupConsumptionStatusList(idealState, streamConfig)).stream().map((v0) -> {
                    return v0.getPartitionGroupId();
                }).collect(Collectors.toSet());
            }
            if (set.contains(Integer.valueOf(partitionGroupId))) {
                String extractRawTableName = TableNameBuilder.extractRawTableName(str);
                long currentTimeMs = getCurrentTimeMs();
                LLCSegmentName lLCSegmentName2 = new LLCSegmentName(extractRawTableName, partitionGroupId, lLCSegmentName.getSequenceNumber() + 1, currentTimeMs);
                createNewSegmentZKMetadata(tableConfig, streamConfig, lLCSegmentName2, currentTimeMs, committingSegmentDescriptor, updateCommittingSegmentZKMetadata, consumingInstancePartitions, set.size(), numReplicas);
                str2 = lLCSegmentName2.getSegmentName();
            }
        }
        long nanoTime3 = System.nanoTime();
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        Lock lock = this._idealStateUpdateLocks[(str.hashCode() & Integer.MAX_VALUE) % this._numIdealStateUpdateLocks];
        try {
            lock.lock();
            updateIdealStateOnSegmentCompletion(str, segmentName, str2, segmentAssignment, singletonMap);
            lock.unlock();
            long nanoTime4 = System.nanoTime();
            LOGGER.info("Finished committing segment metadata for segment: {}. Time taken for updating committing segment metadata: {}ms; creating new consuming segment ({}) metadata: {}ms; updating ideal state: {}ms; total: {}ms", new Object[]{segmentName, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime2 - nanoTime)), str2, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime3 - nanoTime2)), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime4 - nanoTime3)), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime4 - nanoTime))});
            this._metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private SegmentZKMetadata updateCommittingSegmentZKMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = committingSegmentDescriptor.getSegmentName();
        LOGGER.info("Updating segment ZK metadata for committing segment: {}", segmentName);
        Stat stat = new Stat();
        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(str, segmentName, stat);
        Preconditions.checkState(segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS, "Segment status for segment: %s should be IN_PROGRESS, found: %s", segmentName, segmentZKMetadata.getStatus());
        SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
        Preconditions.checkState(segmentMetadata != null, "Failed to find segment metadata from descriptor for segment: %s", segmentName);
        segmentZKMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
        segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
        segmentZKMetadata.setDownloadUrl(isPeerURL(committingSegmentDescriptor.getSegmentLocation()) ? "" : committingSegmentDescriptor.getSegmentLocation());
        segmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()).longValue());
        if (segmentMetadata.getTotalDocs() > 0) {
            Preconditions.checkNotNull(segmentMetadata.getTimeInterval(), "start/end time information is not correctly written to the segment for table: " + str);
            segmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
            segmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            segmentZKMetadata.setStartTime(currentTimeMillis);
            segmentZKMetadata.setEndTime(currentTimeMillis);
        }
        segmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
        SegmentVersion version = segmentMetadata.getVersion();
        if (version != null) {
            segmentZKMetadata.setIndexVersion(version.name());
        }
        segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
        segmentZKMetadata.setPartitionMetadata(getPartitionMetadataFromSegmentMetadata(segmentMetadata));
        persistSegmentZKMetadata(str, segmentZKMetadata, stat.getVersion());
        return segmentZKMetadata;
    }

    private boolean isPeerURL(String str) {
        return str != null && str.toLowerCase().startsWith("peer://");
    }

    private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig streamConfig, LLCSegmentName lLCSegmentName, long j, CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata segmentZKMetadata, InstancePartitions instancePartitions, int i, int i2) {
        String tableName = tableConfig.getTableName();
        String segmentName = lLCSegmentName.getSegmentName();
        String nextOffset = committingSegmentDescriptor.getNextOffset();
        LOGGER.info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", new Object[]{segmentName, nextOffset, Long.valueOf(j)});
        SegmentZKMetadata segmentZKMetadata2 = new SegmentZKMetadata(segmentName);
        segmentZKMetadata2.setCreationTime(j);
        segmentZKMetadata2.setStartOffset(nextOffset);
        segmentZKMetadata2.setNumReplicas(i2);
        segmentZKMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        SegmentPartitionMetadata partitionMetadataFromTableConfig = getPartitionMetadataFromTableConfig(tableConfig, lLCSegmentName.getPartitionGroupId(), i);
        if (partitionMetadataFromTableConfig != null) {
            segmentZKMetadata2.setPartitionMetadata(partitionMetadataFromTableConfig);
        }
        this._flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig).updateFlushThreshold(streamConfig, segmentZKMetadata2, committingSegmentDescriptor, segmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, i, i2));
        persistSegmentZKMetadata(tableName, segmentZKMetadata2, -1);
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int i, int i2) {
        SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
        if (segmentPartitionConfig == null) {
            return null;
        }
        Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
        if (columnPartitionMap.size() != 1) {
            LOGGER.warn("Skip persisting partition metadata because there are other than exact one partition column for table: {}", tableConfig.getTableName());
            return null;
        }
        Map.Entry entry = (Map.Entry) columnPartitionMap.entrySet().iterator().next();
        ColumnPartitionConfig columnPartitionConfig = (ColumnPartitionConfig) entry.getValue();
        if (i2 != columnPartitionConfig.getNumPartitions()) {
            LOGGER.warn("Number of partition groups fetched from the stream '{}' is different than columnPartitionConfig.numPartitions '{}' in the table config. The stream partition count is used. Please update the table config accordingly.", Integer.valueOf(i2), Integer.valueOf(columnPartitionConfig.getNumPartitions()));
        }
        return new SegmentPartitionMetadata(Collections.singletonMap((String) entry.getKey(), new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), i2, Collections.singleton(Integer.valueOf(i)), columnPartitionConfig.getFunctionConfig())));
    }

    @Nullable
    private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadataImpl) {
        for (Map.Entry entry : segmentMetadataImpl.getColumnMetadataMap().entrySet()) {
            ColumnMetadata columnMetadata = (ColumnMetadata) entry.getValue();
            PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
            if (partitionFunction != null) {
                return new SegmentPartitionMetadata(Collections.singletonMap((String) entry.getKey(), new ColumnPartitionMetadata(partitionFunction.getName(), partitionFunction.getNumPartitions(), columnMetadata.getPartitions(), columnMetadata.getPartitionFunction().getFunctionConfig())));
            }
        }
        return null;
    }

    public long getCommitTimeoutMS(String str) {
        long maxSegmentCommitTimeMs = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
        if (this._propertyStore == null) {
            return maxSegmentCommitTimeMs;
        }
        Map streamConfigMap = IngestionConfigUtils.getStreamConfigMap(getTableConfig(str));
        if (!streamConfigMap.containsKey("realtime.segment.commit.timeoutSeconds")) {
            return maxSegmentCommitTimeMs;
        }
        String str2 = (String) streamConfigMap.get("realtime.segment.commit.timeoutSeconds");
        try {
            return TimeUnit.MILLISECONDS.convert(Integer.parseInt(str2), TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.warn("Failed to parse flush size of {}", str2, e);
            return maxSegmentCommitTimeMs;
        }
    }

    @VisibleForTesting
    Set<Integer> getPartitionIds(StreamConfig streamConfig) throws Exception {
        StreamMetadataProvider createStreamMetadataProvider = StreamConsumerFactoryProvider.create(streamConfig).createStreamMetadataProvider(PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + streamConfig.getTopicName());
        try {
            Set<Integer> fetchPartitionIds = createStreamMetadataProvider.fetchPartitionIds(5000L);
            if (createStreamMetadataProvider != null) {
                createStreamMetadataProvider.close();
            }
            return fetchPartitionIds;
        } catch (Throwable th) {
            if (createStreamMetadataProvider != null) {
                try {
                    createStreamMetadataProvider.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> list) {
        return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, list);
    }

    public void segmentStoppedConsuming(LLCSegmentName lLCSegmentName, String str) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName());
        String segmentName = lLCSegmentName.getSegmentName();
        LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, str);
        try {
            HelixHelper.updateIdealState(this._helixManager, tableNameWithType, idealState -> {
                if (!$assertionsDisabled && idealState == null) {
                    throw new AssertionError();
                }
                Map instanceStateMap = idealState.getInstanceStateMap(segmentName);
                String str2 = (String) instanceStateMap.get(str);
                if ("CONSUMING".equals(str2)) {
                    instanceStateMap.put(str, "OFFLINE");
                } else {
                    LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", new Object[]{segmentName, str2, str});
                }
                return idealState;
            }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2000000476837158d), true);
            try {
                this._helixAdmin.resetPartition(this._helixManager.getClusterName(), str, tableNameWithType, Collections.singletonList(segmentName));
            } catch (Exception e) {
            }
        } catch (Exception e2) {
            this._controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
            throw e2;
        }
    }

    private Map<Integer, SegmentZKMetadata> getLatestSegmentZKMetadataMap(String str) {
        List<String> lLCSegments = getLLCSegments(str);
        HashMap hashMap = new HashMap();
        Iterator<String> it = lLCSegments.iterator();
        while (it.hasNext()) {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(it.next());
            hashMap.compute(Integer.valueOf(lLCSegmentName.getPartitionGroupId()), (num, lLCSegmentName2) -> {
                if (lLCSegmentName2 != null && lLCSegmentName.getSequenceNumber() <= lLCSegmentName2.getSequenceNumber()) {
                    return lLCSegmentName2;
                }
                return lLCSegmentName;
            });
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            hashMap2.put((Integer) entry.getKey(), getSegmentZKMetadata(str, ((LLCSegmentName) entry.getValue()).getSegmentName()));
        }
        return hashMap2;
    }

    public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, boolean z, OffsetCriteria offsetCriteria) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableName = tableConfig.getTableName();
        HelixHelper.updateIdealState(this._helixManager, tableName, idealState -> {
            if (!$assertionsDisabled && idealState == null) {
                throw new AssertionError();
            }
            boolean isEnabled = idealState.isEnabled();
            boolean isTablePaused = isTablePaused(idealState);
            boolean z2 = offsetCriteria != null;
            if (!isEnabled || isTablePaused) {
                LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", new Object[]{tableName, Boolean.valueOf(isEnabled), Boolean.valueOf(isTablePaused)});
                return idealState;
            }
            List<PartitionGroupConsumptionStatus> emptyList = z2 ? Collections.emptyList() : getPartitionGroupConsumptionStatusList(idealState, streamConfig);
            OffsetCriteria offsetCriteria2 = streamConfig.getOffsetCriteria();
            streamConfig.setOffsetCriteria(z2 ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
            List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, emptyList);
            streamConfig.setOffsetCriteria(offsetCriteria2);
            return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, z, offsetCriteria);
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d), true);
    }

    @VisibleForTesting
    void updateIdealStateOnSegmentCompletion(String str, String str2, String str3, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map) {
        HelixHelper.updateIdealState(this._helixManager, str, idealState -> {
            if (!$assertionsDisabled && idealState == null) {
                throw new AssertionError();
            }
            if (isExceededMaxSegmentCompletionTime(str, str2, getCurrentTimeMs())) {
                LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment: {}", str2);
                throw new HelixHelper.PermanentUpdaterException("Exceeded max segment completion time for segment " + str2);
            }
            updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), str2, isTablePaused(idealState) ? null : str3, segmentAssignment, map);
            return idealState;
        }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2000000476837158d));
    }

    private boolean isTablePaused(IdealState idealState) {
        return Boolean.parseBoolean(idealState.getRecord().getSimpleField(IS_TABLE_PAUSED));
    }

    @VisibleForTesting
    void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> map, @Nullable String str, @Nullable String str2, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map2) {
        if (str != null) {
            map.put(str, SegmentAssignmentUtils.getInstanceStateMap(map.get(str).keySet(), "ONLINE"));
            LOGGER.info("Updating segment: {} to ONLINE state", str);
        }
        if (str2 != null) {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(str2);
            int partitionGroupId = lLCSegmentName.getPartitionGroupId();
            int sequenceNumber = lLCSegmentName.getSequenceNumber();
            for (String str3 : map.keySet()) {
                LLCSegmentName of = LLCSegmentName.of(str3);
                if (of == null) {
                    LOGGER.debug("Skip segment name {} not in low-level consumer format", str3);
                } else if (of.getPartitionGroupId() == partitionGroupId && of.getSequenceNumber() == sequenceNumber) {
                    String format = String.format("Segment %s is a duplicate of existing segment %s", str2, str3);
                    LOGGER.error(format);
                    throw new HelixHelper.PermanentUpdaterException(format);
                }
            }
            List<String> assignSegment = segmentAssignment.assignSegment(str2, map, map2);
            map.put(str2, SegmentAssignmentUtils.getInstanceStateMap(assignSegment, "CONSUMING"));
            LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", str2, assignSegment);
        }
    }

    @VisibleForTesting
    boolean isExceededMaxSegmentCompletionTime(String str, String str2, long j) {
        Stat stat = new Stat();
        getSegmentZKMetadata(str, str2, stat);
        if (j <= stat.getMtime() + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
            return false;
        }
        LOGGER.info("Segment: {} exceeds the max completion time: {}ms, metadata update time: {}, current time: {}", new Object[]{str2, Long.valueOf(MAX_SEGMENT_COMPLETION_TIME_MILLIS), Long.valueOf(stat.getMtime()), Long.valueOf(j)});
        return true;
    }

    private boolean isAllInstancesInState(Map<String, String> map, String str) {
        Iterator<String> it = map.values().iterator();
        while (it.hasNext()) {
            if (!it.next().equals(str)) {
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig streamConfig, IdealState idealState, List<PartitionGroupMetadata> list, boolean z, OffsetCriteria offsetCriteria) {
        String tableName = tableConfig.getTableName();
        InstancePartitions consumingInstancePartitions = getConsumingInstancePartitions(tableConfig);
        int numReplicas = getNumReplicas(tableConfig, consumingInstancePartitions);
        int size = list.size();
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getPartitionGroupId();
        }).collect(Collectors.toSet());
        SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(this._helixManager, tableConfig, this._controllerMetrics);
        Map<InstancePartitionsType, InstancePartitions> singletonMap = Collections.singletonMap(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
        Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
        long currentTimeMs = getCurrentTimeMs();
        StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory = StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
        Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(tableName);
        HashMap hashMap = new HashMap();
        for (PartitionGroupMetadata partitionGroupMetadata : list) {
            hashMap.put(Integer.valueOf(partitionGroupMetadata.getPartitionGroupId()), partitionGroupMetadata.getStartOffset());
        }
        Map<Integer, StreamPartitionMsgOffset> map = offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA ? hashMap : null;
        for (Map.Entry<Integer, SegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
            int intValue = entry.getKey().intValue();
            SegmentZKMetadata value = entry.getValue();
            String segmentName = value.getSegmentName();
            LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
            Map<String, String> map2 = mapFields.get(segmentName);
            if (map2 != null) {
                if (map2.containsValue("CONSUMING")) {
                    if (value.getStatus() == CommonConstants.Segment.Realtime.Status.DONE && isExceededMaxSegmentCompletionTime(tableName, segmentName, currentTimeMs)) {
                        if (set.contains(Integer.valueOf(intValue))) {
                            LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", segmentName);
                            LLCSegmentName nextLLCSegmentName = getNextLLCSegmentName(lLCSegmentName, currentTimeMs);
                            String segmentName2 = nextLLCSegmentName.getSegmentName();
                            createNewSegmentZKMetadata(tableConfig, streamConfig, nextLLCSegmentName, currentTimeMs, new CommittingSegmentDescriptor(segmentName, createStreamMsgOffsetFactory.create(value.getEndOffset()).toString(), 0L), value, consumingInstancePartitions, size, numReplicas);
                            updateInstanceStatesForNewConsumingSegment(mapFields, segmentName, segmentName2, segmentAssignment, singletonMap);
                        } else {
                            LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. Skipping creation of new ZK metadata and new segment in ideal state", Integer.valueOf(intValue), segmentName);
                            updateInstanceStatesForNewConsumingSegment(mapFields, segmentName, null, segmentAssignment, singletonMap);
                        }
                    }
                } else if (isAllInstancesInState(map2, "OFFLINE")) {
                    LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", segmentName);
                    if (map == null) {
                        map = fetchPartitionGroupIdToSmallestOffset(streamConfig);
                    }
                    createNewConsumingSegment(tableConfig, streamConfig, value, currentTimeMs, list, consumingInstancePartitions, mapFields, segmentAssignment, singletonMap, selectStartOffset(offsetCriteria, intValue, hashMap, map, tableConfig.getTableName(), createStreamMsgOffsetFactory, value.getStartOffset()));
                } else if (set.contains(Integer.valueOf(intValue))) {
                    if (z && value.getStatus().isCompleted() && isAllInstancesInState(map2, "ONLINE")) {
                        if (map == null) {
                            map = fetchPartitionGroupIdToSmallestOffset(streamConfig);
                        }
                        createNewConsumingSegment(tableConfig, streamConfig, value, currentTimeMs, list, consumingInstancePartitions, mapFields, segmentAssignment, singletonMap, selectStartOffset(offsetCriteria, intValue, hashMap, map, tableConfig.getTableName(), createStreamMsgOffsetFactory, value.getEndOffset()));
                    } else {
                        LOGGER.error("Got unexpected instance state map: {} for segment: {}", map2, segmentName);
                    }
                }
            } else if (isExceededMaxSegmentCompletionTime(tableName, segmentName, currentTimeMs)) {
                LOGGER.info("Repairing segment: {} which has segment ZK metadata but does not exist in IdealState", segmentName);
                if (value.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
                    String str = null;
                    Iterator<Map.Entry<String, Map<String, String>>> it = mapFields.entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry<String, Map<String, String>> next = it.next();
                        if (next.getValue().containsValue("CONSUMING") && new LLCSegmentName(next.getKey()).getPartitionGroupId() == intValue) {
                            str = next.getKey();
                            break;
                        }
                    }
                    if (str == null) {
                        LOGGER.error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", Integer.valueOf(intValue), tableName);
                        this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
                    }
                    updateInstanceStatesForNewConsumingSegment(mapFields, str, segmentName, segmentAssignment, singletonMap);
                } else {
                    LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", value.getStatus(), segmentName);
                }
            }
        }
        for (PartitionGroupMetadata partitionGroupMetadata2 : list) {
            if (!latestSegmentZKMetadataMap.containsKey(Integer.valueOf(partitionGroupMetadata2.getPartitionGroupId()))) {
                updateInstanceStatesForNewConsumingSegment(mapFields, null, setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata2, currentTimeMs, consumingInstancePartitions, size, numReplicas), segmentAssignment, singletonMap);
            }
        }
        return idealState;
    }

    private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig, SegmentZKMetadata segmentZKMetadata, long j, List<PartitionGroupMetadata> list, InstancePartitions instancePartitions, Map<String, Map<String, String>> map, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> map2, StreamPartitionMsgOffset streamPartitionMsgOffset) {
        int numReplicas = getNumReplicas(tableConfig, instancePartitions);
        int size = list.size();
        LLCSegmentName nextLLCSegmentName = getNextLLCSegmentName(new LLCSegmentName(segmentZKMetadata.getSegmentName()), j);
        createNewSegmentZKMetadata(tableConfig, streamConfig, nextLLCSegmentName, j, new CommittingSegmentDescriptor(segmentZKMetadata.getSegmentName(), streamPartitionMsgOffset.toString(), 0L), segmentZKMetadata, instancePartitions, size, numReplicas);
        updateInstanceStatesForNewConsumingSegment(map, null, nextLLCSegmentName.getSegmentName(), segmentAssignment, map2);
    }

    private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
        OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
        streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
        List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
        streamConfig.setOffsetCriteria(offsetCriteria);
        HashMap hashMap = new HashMap();
        for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
            hashMap.put(Integer.valueOf(partitionGroupMetadata.getPartitionGroupId()), partitionGroupMetadata.getStartOffset());
        }
        return hashMap;
    }

    private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int i, Map<Integer, StreamPartitionMsgOffset> map, Map<Integer, StreamPartitionMsgOffset> map2, String str, StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory, String str2) {
        if (offsetCriteria != null) {
            return map.get(Integer.valueOf(i));
        }
        StreamPartitionMsgOffset create = streamPartitionMsgOffsetFactory.create(str2);
        StreamPartitionMsgOffset streamPartitionMsgOffset = map2.get(Integer.valueOf(i));
        if (streamPartitionMsgOffset.compareTo(create) <= 0) {
            return create;
        }
        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", new Object[]{create, streamPartitionMsgOffset, Integer.valueOf(i), str});
        this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
        return streamPartitionMsgOffset;
    }

    private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lLCSegmentName, long j) {
        return new LLCSegmentName(lLCSegmentName.getTableName(), lLCSegmentName.getPartitionGroupId(), lLCSegmentName.getSequenceNumber() + 1, j);
    }

    private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata, long j, InstancePartitions instancePartitions, int i, int i2) {
        String tableName = tableConfig.getTableName();
        int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
        String streamPartitionMsgOffset = partitionGroupMetadata.getStartOffset().toString();
        LOGGER.info("Setting up new partition group: {} for table: {}", Integer.valueOf(partitionGroupId), tableName);
        LLCSegmentName lLCSegmentName = new LLCSegmentName(TableNameBuilder.extractRawTableName(tableName), partitionGroupId, 0, j);
        String segmentName = lLCSegmentName.getSegmentName();
        createNewSegmentZKMetadata(tableConfig, streamConfig, lLCSegmentName, j, new CommittingSegmentDescriptor(null, streamPartitionMsgOffset, 0L), null, instancePartitions, i, i2);
        return segmentName;
    }

    @VisibleForTesting
    long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
        return instancePartitions.getNumReplicaGroups() == 1 ? tableConfig.getReplication() : instancePartitions.getNumReplicaGroups();
    }

    private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions, int i, int i2) {
        if (instancePartitions.getNumReplicaGroups() == 1) {
            int size = instancePartitions.getInstances(0, 0).size();
            return (((i * i2) + size) - 1) / size;
        }
        int size2 = instancePartitions.getInstances(0, 0).size();
        return ((i + size2) - 1) / size2;
    }

    public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMetadata> list) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        String tableName = tableConfig.getTableName();
        String extractRawTableName = TableNameBuilder.extractRawTableName(tableName);
        SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
        if (validationConfig.getRetentionTimeUnit() == null || validationConfig.getRetentionTimeUnit().isEmpty() || validationConfig.getRetentionTimeValue() == null || validationConfig.getRetentionTimeValue().isEmpty()) {
            return;
        }
        TimeRetentionStrategy timeRetentionStrategy = new TimeRetentionStrategy(TimeUnit.MILLISECONDS, TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase()).toMillis(Long.parseLong(validationConfig.getRetentionTimeValue())) - 3600000);
        PinotFS create = PinotFSFactory.create(URIUtils.getUri(this._controllerConf.getDataDir()).getScheme());
        for (SegmentZKMetadata segmentZKMetadata : list) {
            String segmentName = segmentZKMetadata.getSegmentName();
            try {
            } catch (Exception e) {
                LOGGER.warn("Failed checking segment deep store URL for segment {}", segmentName);
            }
            if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE && "".equals(segmentZKMetadata.getDownloadUrl())) {
                if (timeRetentionStrategy.isPurgeable(tableName, segmentZKMetadata)) {
                    LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
                } else if (this._deepStoreUploadExecutorPendingSegments.add(segmentName)) {
                    this._deepStoreUploadExecutor.submit(() -> {
                        try {
                            try {
                                LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
                                List peerServerURIs = PeerServerSegmentFinder.getPeerServerURIs(segmentName, "http", this._helixManager);
                                if (peerServerURIs.isEmpty()) {
                                    throw new IllegalStateException(String.format("Failed to upload segment %s to deep store because no online replica is found", segmentName));
                                }
                                String format = String.format("%s?uploadTimeoutMs=%d", StringUtil.join("/", new String[]{((URI) peerServerURIs.get(RANDOM.nextInt(peerServerURIs.size()))).toString(), "upload"}), Integer.valueOf(this._deepstoreUploadRetryTimeoutMs));
                                LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName, format);
                                String moveSegmentFile = moveSegmentFile(extractRawTableName, segmentName, this._fileUploadDownloadClient.uploadToSegmentStore(format), create);
                                LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, moveSegmentFile);
                                segmentZKMetadata.setDownloadUrl(moveSegmentFile);
                                persistSegmentZKMetadata(tableName, segmentZKMetadata, -1);
                                LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName, moveSegmentFile);
                                this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_SUCCESS, 1L);
                                this._deepStoreUploadExecutorPendingSegments.remove(segmentName);
                                this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), this._deepStoreUploadExecutorPendingSegments.size());
                            } catch (Exception e2) {
                                this._controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
                                LOGGER.error("Failed to upload segment {} to deep store", segmentName, e2);
                                this._deepStoreUploadExecutorPendingSegments.remove(segmentName);
                                this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), this._deepStoreUploadExecutorPendingSegments.size());
                            }
                        } catch (Throwable th) {
                            this._deepStoreUploadExecutorPendingSegments.remove(segmentName);
                            this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), this._deepStoreUploadExecutorPendingSegments.size());
                            throw th;
                        }
                    });
                } else {
                    this._controllerMetrics.setOrUpdateGauge(ControllerGauge.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_QUEUE_SIZE.getGaugeName(), this._deepStoreUploadExecutorPendingSegments.size());
                }
            }
        }
    }

    @VisibleForTesting
    boolean deepStoreUploadExecutorPendingSegmentsIsEmpty() {
        return this._deepStoreUploadExecutorPendingSegments.isEmpty();
    }

    public long deleteTmpSegments(String str, List<SegmentZKMetadata> list) {
        Preconditions.checkState(!this._isStopping, "Segment manager is stopping");
        if (!TableNameBuilder.isRealtimeTableResource(str)) {
            return 0L;
        }
        if (this._helixResourceManager.getTableConfig(str) == null) {
            LOGGER.warn("Failed to find table config for table: {}, skipping deletion of tmp segments", str);
            return 0L;
        }
        if (!isTmpSegmentAsyncDeletionEnabled()) {
            return 0L;
        }
        Set<String> set = (Set) list.stream().filter(segmentZKMetadata -> {
            return segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE && !"".equals(segmentZKMetadata.getDownloadUrl());
        }).map((v0) -> {
            return v0.getDownloadUrl();
        }).collect(Collectors.toSet());
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        URI uri = URIUtils.getUri(this._controllerConf.getDataDir(), new String[]{extractRawTableName});
        PinotFS create = PinotFSFactory.create(uri.getScheme());
        long j = 0;
        try {
            for (String str2 : create.listFiles(uri, false)) {
                URI uri2 = URIUtils.getUri(str2);
                if (isTmpAndCanDelete(uri2, set, create)) {
                    LOGGER.info("Deleting temporary segment file: {}", uri2);
                    if (create.delete(uri2, true)) {
                        LOGGER.info("Succeed to delete file: {}", uri2);
                        j++;
                    } else {
                        LOGGER.warn("Failed to delete file: {}", uri2);
                    }
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Caught exception while deleting temporary files for table: {}", extractRawTableName, e);
        }
        return j;
    }

    private boolean isTmpAndCanDelete(URI uri, Set<String> set, PinotFS pinotFS) throws Exception {
        long lastModified = pinotFS.lastModified(uri);
        if (lastModified <= 0) {
            LOGGER.warn("file {} modification time {} is not positive, ineligible for delete", uri.toString(), Long.valueOf(lastModified));
            return false;
        }
        String uri2 = uri.toString();
        return SegmentCompletionUtils.isTmpFile(uri2) && !set.contains(uri2) && getCurrentTimeMs() - lastModified > ((long) this._controllerConf.getTmpSegmentRetentionInSeconds()) * 1000;
    }

    public Set<String> forceCommit(String str, @Nullable String str2, @Nullable String str3) {
        Set<String> filterSegmentsToCommit = filterSegmentsToCommit(findConsumingSegments(getIdealState(str)), str2, str3);
        sendForceCommitMessageToServers(str, filterSegmentsToCommit);
        return filterSegmentsToCommit;
    }

    private Set<String> filterSegmentsToCommit(Set<String> set, @Nullable String str, @Nullable String str2) {
        if (str == null && str2 == null) {
            return set;
        }
        if (str2 != null) {
            Set<String> set2 = (Set) Arrays.stream(str2.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toSet());
            Preconditions.checkState(set.containsAll(set2), "Cannot commit segments that are not in CONSUMING state. All consuming segments: %s, provided segments to commit: %s", set, str2);
            return set2;
        }
        Set set3 = (Set) Arrays.stream(str.split(",")).map((v0) -> {
            return v0.trim();
        }).map(Integer::parseInt).collect(Collectors.toSet());
        Set<String> set4 = (Set) set.stream().filter(str3 -> {
            return set3.contains(Integer.valueOf(new LLCSegmentName(str3).getPartitionGroupId()));
        }).collect(Collectors.toSet());
        Preconditions.checkState(!set4.isEmpty(), "Cannot find segments to commit for partitions: %s", str);
        return set4;
    }

    public PauseStatus pauseConsumption(String str) {
        Set<String> findConsumingSegments = findConsumingSegments(updatePauseStatusInIdealState(str, true));
        sendForceCommitMessageToServers(str, findConsumingSegments);
        return new PauseStatus(true, findConsumingSegments, findConsumingSegments.isEmpty() ? null : "Pause flag is set. Consuming segments are being committed. Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.");
    }

    public PauseStatus resumeConsumption(String str, @Nullable String str2) {
        IdealState updatePauseStatusInIdealState = updatePauseStatusInIdealState(str, false);
        HashMap hashMap = new HashMap();
        hashMap.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
        if (str2 != null) {
            hashMap.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, str2);
        }
        this._helixResourceManager.invokeControllerPeriodicTask(str, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, hashMap);
        return new PauseStatus(false, findConsumingSegments(updatePauseStatusInIdealState), "Pause flag is cleared. Consuming segments are being created. Use /pauseStatus endpoint in a few moments to double check.");
    }

    private IdealState updatePauseStatusInIdealState(String str, boolean z) {
        IdealState updateIdealState = HelixHelper.updateIdealState(this._helixManager, str, idealState -> {
            ZNRecord record = idealState.getRecord();
            record.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(z).toString());
            return new IdealState(record);
        }, RetryPolicies.noDelayRetryPolicy(1));
        LOGGER.info("Set 'isTablePaused' to {} in the Ideal State for table {}.", Boolean.valueOf(z), str);
        return updateIdealState;
    }

    private void sendForceCommitMessageToServers(String str, Set<String> set) {
        if (set.isEmpty()) {
            return;
        }
        Criteria criteria = new Criteria();
        criteria.setInstanceName("%");
        criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
        criteria.setResource(str);
        criteria.setSessionSpecific(true);
        int send = this._helixManager.getMessagingService().send(criteria, new ForceCommitMessage(str, set), (AsyncCallback) null, -1);
        if (send <= 0) {
            throw new RuntimeException(String.format("No force commit message was sent for table: %s segments: %s", str, set));
        }
        LOGGER.info("Sent {} force commit messages for table: {} segments: {}", new Object[]{Integer.valueOf(send), str, set});
    }

    private Set<String> findConsumingSegments(IdealState idealState) {
        TreeSet treeSet = new TreeSet();
        idealState.getRecord().getMapFields().forEach((str, map) -> {
            Iterator it = map.values().iterator();
            while (it.hasNext()) {
                if (((String) it.next()).equals("CONSUMING")) {
                    treeSet.add(str);
                    return;
                }
            }
        });
        return treeSet;
    }

    public PauseStatus getPauseStatus(String str) {
        IdealState idealState = getIdealState(str);
        String simpleField = idealState.getRecord().getSimpleField(IS_TABLE_PAUSED);
        return new PauseStatus(Boolean.parseBoolean(simpleField), findConsumingSegments(idealState), null);
    }

    @VisibleForTesting
    String moveSegmentFile(String str, String str2, String str3, PinotFS pinotFS) throws IOException {
        URI uri = URIUtils.getUri(str3);
        URI createSegmentPath = createSegmentPath(str, str2);
        Preconditions.checkState(pinotFS.move(uri, createSegmentPath, true), "Failed to move segment file for segment: %s from: %s to: %s", str2, str3, createSegmentPath);
        return createSegmentPath.toString();
    }

    @VisibleForTesting
    URI createSegmentPath(String str, String str2) {
        return URIUtils.getUri(this._controllerConf.getDataDir(), new String[]{str, URIUtils.encode(str2)});
    }

    static {
        $assertionsDisabled = !PinotLLCRealtimeSegmentManager.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PinotLLCRealtimeSegmentManager.class);
        RANDOM = new Random();
    }
}
