package org.apache.pinot.core.data.manager.realtime;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.$internal.com.google.common.cache.Cache;
import org.apache.pinot.$internal.com.google.common.cache.CacheBuilder;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.class */
public class IngestionDelayTracker {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) IngestionDelayTracker.class);
    private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 300000;
    private static final int PARTITION_TIMEOUT_MS = 600000;
    private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
    private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
    private final Map<Integer, IngestionInfo> _ingestionInfoMap;
    private final Map<Integer, Long> _partitionsMarkedForVerification;
    private final Cache<String, Boolean> _segmentsToIgnore;
    private final ScheduledExecutorService _scheduledExecutor;
    private final ServerMetrics _serverMetrics;
    private final String _tableNameWithType;
    private final String _metricName;
    private final RealtimeTableDataManager _realTimeTableDataManager;
    private final Supplier<Boolean> _isServerReadyToServeQueries;
    private Clock _clock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker$IngestionInfo.class */
    public static class IngestionInfo {
        final long _ingestionTimeMs;
        final long _firstStreamIngestionTimeMs;
        final StreamPartitionMsgOffset _currentOffset;
        final StreamPartitionMsgOffset _latestOffset;

        IngestionInfo(long j, long j2, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset2) {
            this._ingestionTimeMs = j;
            this._firstStreamIngestionTimeMs = j2;
            this._currentOffset = streamPartitionMsgOffset;
            this._latestOffset = streamPartitionMsgOffset2;
        }
    }

    @VisibleForTesting
    public IngestionDelayTracker(ServerMetrics serverMetrics, final String str, RealtimeTableDataManager realtimeTableDataManager, int i, Supplier<Boolean> supplier) throws RuntimeException {
        this._ingestionInfoMap = new ConcurrentHashMap();
        this._partitionsMarkedForVerification = new ConcurrentHashMap();
        this._segmentsToIgnore = CacheBuilder.newBuilder().expireAfterAccess(10L, TimeUnit.MINUTES).build();
        this._scheduledExecutor = Executors.newScheduledThreadPool(2);
        this._serverMetrics = serverMetrics;
        this._tableNameWithType = str;
        this._metricName = str;
        this._realTimeTableDataManager = realtimeTableDataManager;
        this._clock = Clock.systemUTC();
        this._isServerReadyToServeQueries = supplier;
        if (i <= 0) {
            throw new RuntimeException("Illegal timer timeout argument, expected > 0, got=" + i + " for table=" + this._tableNameWithType);
        }
        ((ScheduledThreadPoolExecutor) this._scheduledExecutor).setThreadFactory(new ThreadFactory() { // from class: org.apache.pinot.core.data.manager.realtime.IngestionDelayTracker.1
            private final ThreadFactory _defaultFactory = Executors.defaultThreadFactory();

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread newThread = this._defaultFactory.newThread(runnable);
                newThread.setName("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(str));
                return newThread;
            }
        });
        this._scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions, 100L, i, TimeUnit.MILLISECONDS);
    }

    public IngestionDelayTracker(ServerMetrics serverMetrics, String str, RealtimeTableDataManager realtimeTableDataManager, Supplier<Boolean> supplier) {
        this(serverMetrics, str, realtimeTableDataManager, 300000, supplier);
    }

    private long getIngestionDelayMs(long j) {
        if (j < 0) {
            return 0L;
        }
        return Math.max(this._clock.millis() - j, 0L);
    }

    private void removePartitionId(int i) {
        this._ingestionInfoMap.compute(Integer.valueOf(i), (num, ingestionInfo) -> {
            if (ingestionInfo == null) {
                return null;
            }
            this._serverMetrics.removePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_DELAY_MS);
            this._serverMetrics.removePartitionGauge(this._metricName, i, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
            this._serverMetrics.removePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
            return null;
        });
        this._partitionsMarkedForVerification.remove(Integer.valueOf(i));
    }

    private List<Integer> getPartitionsToBeVerified() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, Long> entry : this._partitionsMarkedForVerification.entrySet()) {
            if (this._clock.millis() - entry.getValue().longValue() > 600000) {
                arrayList.add(entry.getKey());
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this._clock = clock;
    }

    public void updateIngestionMetrics(String str, int i, long j, long j2, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset2) {
        if (!this._isServerReadyToServeQueries.get().booleanValue() || this._realTimeTableDataManager.isShutDown()) {
            return;
        }
        if (j >= 0 || j2 >= 0 || !(streamPartitionMsgOffset == null || streamPartitionMsgOffset2 == null)) {
            this._ingestionInfoMap.compute(Integer.valueOf(i), (num, ingestionInfo) -> {
                if (this._segmentsToIgnore.getIfPresent(str) != null) {
                    return ingestionInfo;
                }
                if (ingestionInfo == null) {
                    if (j > 0) {
                        this._serverMetrics.setOrUpdatePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> {
                            return Long.valueOf(getPartitionIngestionDelayMs(i));
                        });
                    }
                    if (j2 > 0) {
                        this._serverMetrics.setOrUpdatePartitionGauge(this._metricName, i, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> {
                            return Long.valueOf(getPartitionEndToEndIngestionDelayMs(i));
                        });
                    }
                    if (streamPartitionMsgOffset != null && streamPartitionMsgOffset2 != null) {
                        this._serverMetrics.setOrUpdatePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> {
                            return Long.valueOf(getPartitionIngestionOffsetLag(i));
                        });
                    }
                }
                return new IngestionInfo(j, j2, streamPartitionMsgOffset, streamPartitionMsgOffset2);
            });
            this._partitionsMarkedForVerification.remove(Integer.valueOf(i));
        }
    }

    public void stopTrackingPartitionIngestionDelay(int i) {
        removePartitionId(i);
    }

    public void stopTrackingPartitionIngestionDelay(String str) {
        this._segmentsToIgnore.put(str, true);
        removePartitionId(new LLCSegmentName(str).getPartitionGroupId());
    }

    public void timeoutInactivePartitions() {
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            List<Integer> partitionsToBeVerified = getPartitionsToBeVerified();
            if (partitionsToBeVerified.isEmpty()) {
                return;
            }
            try {
                Set<Integer> hostedPartitionsGroupIds = this._realTimeTableDataManager.getHostedPartitionsGroupIds();
                Iterator<Integer> it2 = partitionsToBeVerified.iterator();
                while (it2.hasNext()) {
                    int intValue = it2.next().intValue();
                    if (!hostedPartitionsGroupIds.contains(Integer.valueOf(intValue))) {
                        removePartitionId(intValue);
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", this._tableNameWithType, e.getClass(), e.getMessage());
            }
        }
    }

    public void markPartitionForVerification(String str) {
        if (this._isServerReadyToServeQueries.get().booleanValue() && this._segmentsToIgnore.getIfPresent(str) == null) {
            this._partitionsMarkedForVerification.put(Integer.valueOf(new LLCSegmentName(str).getPartitionGroupId()), Long.valueOf(this._clock.millis()));
        }
    }

    public long getPartitionIngestionTimeMs(int i) {
        IngestionInfo ingestionInfo = this._ingestionInfoMap.get(Integer.valueOf(i));
        if (ingestionInfo != null) {
            return ingestionInfo._ingestionTimeMs;
        }
        return Long.MIN_VALUE;
    }

    public long getPartitionIngestionDelayMs(int i) {
        IngestionInfo ingestionInfo = this._ingestionInfoMap.get(Integer.valueOf(i));
        if (ingestionInfo != null) {
            return getIngestionDelayMs(ingestionInfo._ingestionTimeMs);
        }
        return 0L;
    }

    public long getPartitionEndToEndIngestionDelayMs(int i) {
        IngestionInfo ingestionInfo = this._ingestionInfoMap.get(Integer.valueOf(i));
        if (ingestionInfo != null) {
            return getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs);
        }
        return 0L;
    }

    public long getPartitionIngestionOffsetLag(int i) {
        IngestionInfo ingestionInfo = this._ingestionInfoMap.get(Integer.valueOf(i));
        if (ingestionInfo == null) {
            return 0L;
        }
        StreamPartitionMsgOffset streamPartitionMsgOffset = ingestionInfo._currentOffset;
        StreamPartitionMsgOffset streamPartitionMsgOffset2 = ingestionInfo._latestOffset;
        if (streamPartitionMsgOffset == null || streamPartitionMsgOffset2 == null || !(streamPartitionMsgOffset instanceof LongMsgOffset) || !(streamPartitionMsgOffset2 instanceof LongMsgOffset)) {
            return 0L;
        }
        return ((LongMsgOffset) streamPartitionMsgOffset2).getOffset() - ((LongMsgOffset) streamPartitionMsgOffset).getOffset();
    }

    public void shutdown() {
        this._scheduledExecutor.shutdown();
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            Iterator<Integer> it2 = this._ingestionInfoMap.keySet().iterator();
            while (it2.hasNext()) {
                removePartitionId(it2.next().intValue());
            }
        }
    }
}
