package org.apache.pinot.controller.helix.core.realtime;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.class */
public class MissingConsumingSegmentFinder {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MissingConsumingSegmentFinder.class);
    private final String _realtimeTableName;
    private final SegmentMetadataFetcher _segmentMetadataFetcher;
    private final Map<Integer, StreamPartitionMsgOffset> _partitionGroupIdToLargestStreamOffsetMap;
    private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
    private ControllerMetrics _controllerMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder$MissingSegmentInfo.class */
    public static class MissingSegmentInfo {
        long _totalCount;
        long _newPartitionGroupCount;
        long _maxDurationInMinutes;

        MissingSegmentInfo() {
        }
    }

    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder$SegmentMetadataFetcher.class */
    static class SegmentMetadataFetcher {
        private ZkHelixPropertyStore<ZNRecord> _propertyStore;
        private ControllerMetrics _controllerMetrics;

        public SegmentMetadataFetcher(ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore, ControllerMetrics controllerMetrics) {
            this._propertyStore = zkHelixPropertyStore;
            this._controllerMetrics = controllerMetrics;
        }

        public SegmentZKMetadata fetchSegmentZkMetadata(String str, String str2) {
            return fetchSegmentZkMetadata(str, str2, null);
        }

        public long fetchSegmentCompletionTime(String str, String str2) {
            Stat stat = new Stat();
            fetchSegmentZkMetadata(str, str2, stat);
            return stat.getMtime();
        }

        private SegmentZKMetadata fetchSegmentZkMetadata(String str, String str2, Stat stat) {
            try {
                ZNRecord zNRecord = this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(str, str2), stat, AccessOption.PERSISTENT);
                Preconditions.checkState(zNRecord != null, "Failed to find segment ZK metadata for segment: %s of table: %s", str2, str);
                return new SegmentZKMetadata(zNRecord);
            } catch (Exception e) {
                this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
                throw e;
            }
        }
    }

    public MissingConsumingSegmentFinder(String str, ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore, ControllerMetrics controllerMetrics, PartitionLevelStreamConfig partitionLevelStreamConfig) {
        this._realtimeTableName = str;
        this._controllerMetrics = controllerMetrics;
        this._segmentMetadataFetcher = new SegmentMetadataFetcher(zkHelixPropertyStore, controllerMetrics);
        this._streamPartitionMsgOffsetFactory = StreamConsumerFactoryProvider.create(partitionLevelStreamConfig).createStreamMsgOffsetFactory();
        this._partitionGroupIdToLargestStreamOffsetMap = new HashMap();
        partitionLevelStreamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
        try {
            PinotTableIdealStateBuilder.getPartitionGroupMetadataList(partitionLevelStreamConfig, Collections.emptyList()).forEach(partitionGroupMetadata -> {
                this._partitionGroupIdToLargestStreamOffsetMap.put(Integer.valueOf(partitionGroupMetadata.getPartitionGroupId()), partitionGroupMetadata.getStartOffset());
            });
        } catch (Exception e) {
            LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. Continue finding missing consuming segment only with ideal state information.", partitionLevelStreamConfig.getTopicName(), partitionLevelStreamConfig.getTableNameWithType());
        }
    }

    @VisibleForTesting
    MissingConsumingSegmentFinder(String str, SegmentMetadataFetcher segmentMetadataFetcher, Map<Integer, StreamPartitionMsgOffset> map, StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
        this._realtimeTableName = str;
        this._segmentMetadataFetcher = segmentMetadataFetcher;
        this._partitionGroupIdToLargestStreamOffsetMap = map;
        this._streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
    }

    public void findAndEmitMetrics(IdealState idealState) {
        MissingSegmentInfo findMissingSegments = findMissingSegments(idealState.getRecord().getMapFields(), Instant.now());
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT, findMissingSegments._totalCount);
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT, findMissingSegments._newPartitionGroupCount);
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, ControllerGauge.MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES, findMissingSegments._maxDurationInMinutes);
    }

    @VisibleForTesting
    MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> map, Instant instant) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        map.forEach((str, map2) -> {
            LLCSegmentName of = LLCSegmentName.of(str);
            if (of != null) {
                if (map2.containsValue("CONSUMING")) {
                    updateMap(hashMap, of);
                } else if (map2.containsValue("ONLINE")) {
                    updateMap(hashMap2, of);
                }
            }
        });
        MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo();
        if (this._partitionGroupIdToLargestStreamOffsetMap.isEmpty()) {
            hashMap2.forEach((num, lLCSegmentName) -> {
                if (hashMap.containsKey(num)) {
                    return;
                }
                missingSegmentInfo._totalCount++;
                updateMaxDurationInfo(missingSegmentInfo, num, this._segmentMetadataFetcher.fetchSegmentCompletionTime(this._realtimeTableName, lLCSegmentName.getSegmentName()), instant);
            });
        } else {
            this._partitionGroupIdToLargestStreamOffsetMap.forEach((num2, streamPartitionMsgOffset) -> {
                if (hashMap.containsKey(num2)) {
                    return;
                }
                LLCSegmentName lLCSegmentName2 = (LLCSegmentName) hashMap2.get(num2);
                if (lLCSegmentName2 == null) {
                    missingSegmentInfo._newPartitionGroupCount++;
                    missingSegmentInfo._totalCount++;
                    return;
                }
                SegmentZKMetadata fetchSegmentZkMetadata = this._segmentMetadataFetcher.fetchSegmentZkMetadata(this._realtimeTableName, lLCSegmentName2.getSegmentName());
                if (this._streamPartitionMsgOffsetFactory.create(fetchSegmentZkMetadata.getEndOffset()).compareTo(streamPartitionMsgOffset) < 0) {
                    missingSegmentInfo._totalCount++;
                    updateMaxDurationInfo(missingSegmentInfo, num2, fetchSegmentZkMetadata.getCreationTime(), instant);
                }
            });
        }
        return missingSegmentInfo;
    }

    private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, Integer num, long j, Instant instant) {
        long minutes = Duration.between(Instant.ofEpochMilli(j), instant).toMinutes();
        if (minutes > missingSegmentInfo._maxDurationInMinutes) {
            missingSegmentInfo._maxDurationInMinutes = minutes;
        }
        LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} minutes!", num, Long.valueOf(minutes));
    }

    private void updateMap(Map<Integer, LLCSegmentName> map, LLCSegmentName lLCSegmentName) {
        map.compute(Integer.valueOf(lLCSegmentName.getPartitionGroupId()), (num, lLCSegmentName2) -> {
            if (lLCSegmentName2 != null && lLCSegmentName2.getSequenceNumber() > lLCSegmentName.getSequenceNumber()) {
                return lLCSegmentName2;
            }
            return lLCSegmentName;
        });
    }
}
