package org.apache.pinot.core.data.manager.realtime;

import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.eclipse.jetty.server.session.HouseKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.class */
public class IngestionDelayTracker {
    private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000;
    private static final int PARTITION_TIMEOUT_MS = 600000;
    private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
    private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
    private final Map<Integer, IngestionTimestamps> _partitionToIngestionTimestampsMap;
    private final Map<Integer, Long> _partitionsMarkedForVerification;
    final int _timerThreadTickIntervalMs;
    private final Timer _timer;
    private final ServerMetrics _serverMetrics;
    private final String _tableNameWithType;
    private final String _metricName;
    private final RealtimeTableDataManager _realTimeTableDataManager;
    private final Supplier<Boolean> _isServerReadyToServeQueries;
    private Clock _clock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker$IngestionTimestamps.class */
    public static class IngestionTimestamps {
        private final long _ingestionTimeMs;
        private final long _firstStreamIngestionTimeMs;

        IngestionTimestamps(long j, long j2) {
            this._ingestionTimeMs = j;
            this._firstStreamIngestionTimeMs = j2;
        }
    }

    public IngestionDelayTracker(ServerMetrics serverMetrics, String str, RealtimeTableDataManager realtimeTableDataManager, int i, Supplier<Boolean> supplier) throws RuntimeException {
        this._partitionToIngestionTimestampsMap = new ConcurrentHashMap();
        this._partitionsMarkedForVerification = new ConcurrentHashMap();
        this._serverMetrics = serverMetrics;
        this._tableNameWithType = str;
        this._metricName = str;
        this._realTimeTableDataManager = realtimeTableDataManager;
        this._clock = Clock.systemUTC();
        this._isServerReadyToServeQueries = supplier;
        if (i <= 0) {
            throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s", Integer.valueOf(i), this._tableNameWithType));
        }
        this._timerThreadTickIntervalMs = i;
        this._timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(str));
        this._timer.schedule(new TimerTask() { // from class: org.apache.pinot.core.data.manager.realtime.IngestionDelayTracker.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                IngestionDelayTracker.this.timeoutInactivePartitions();
            }
        }, 100L, this._timerThreadTickIntervalMs);
    }

    public IngestionDelayTracker(ServerMetrics serverMetrics, String str, RealtimeTableDataManager realtimeTableDataManager, Supplier<Boolean> supplier) {
        this(serverMetrics, str, realtimeTableDataManager, 300000, supplier);
    }

    private long getIngestionDelayMs(long j) {
        if (j < 0) {
            return 0L;
        }
        return Math.max(this._clock.millis() - j, 0L);
    }

    private void removePartitionId(int i) {
        this._partitionToIngestionTimestampsMap.remove(Integer.valueOf(i));
        this._partitionsMarkedForVerification.remove(Integer.valueOf(i));
        this._serverMetrics.removePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_DELAY_MS);
    }

    private List<Integer> getPartitionsToBeVerified() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, Long> entry : this._partitionsMarkedForVerification.entrySet()) {
            if (this._clock.millis() - entry.getValue().longValue() > HouseKeeper.DEFAULT_PERIOD_MS) {
                arrayList.add(entry.getKey());
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this._clock = clock;
    }

    public void updateIngestionDelay(long j, long j2, int i) {
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            if (j >= 0 || j2 >= 0) {
                if (this._partitionToIngestionTimestampsMap.put(Integer.valueOf(i), new IngestionTimestamps(j, j2)) == null) {
                    if (j >= 0) {
                        this._serverMetrics.setOrUpdatePartitionGauge(this._metricName, i, ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> {
                            return Long.valueOf(getPartitionIngestionDelayMs(i));
                        });
                    }
                    if (j2 >= 0) {
                        this._serverMetrics.setOrUpdatePartitionGauge(this._metricName, i, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> {
                            return Long.valueOf(getPartitionEndToEndIngestionDelayMs(i));
                        });
                    }
                }
                this._partitionsMarkedForVerification.remove(Integer.valueOf(i));
            }
        }
    }

    public void stopTrackingPartitionIngestionDelay(int i) {
        removePartitionId(i);
    }

    public void timeoutInactivePartitions() {
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            List<Integer> partitionsToBeVerified = getPartitionsToBeVerified();
            if (partitionsToBeVerified.size() == 0) {
                return;
            }
            try {
                Set<Integer> hostedPartitionsGroupIds = this._realTimeTableDataManager.getHostedPartitionsGroupIds();
                Iterator<Integer> it2 = partitionsToBeVerified.iterator();
                while (it2.hasNext()) {
                    int intValue = it2.next().intValue();
                    if (!hostedPartitionsGroupIds.contains(Integer.valueOf(intValue))) {
                        removePartitionId(intValue);
                    }
                }
            } catch (Exception e) {
                _logger.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", this._tableNameWithType, e.getClass(), e.getMessage());
            }
        }
    }

    public void markPartitionForVerification(int i) {
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            this._partitionsMarkedForVerification.put(Integer.valueOf(i), Long.valueOf(this._clock.millis()));
        }
    }

    public long getPartitionIngestionDelayMs(int i) {
        IngestionTimestamps ingestionTimestamps = this._partitionToIngestionTimestampsMap.get(Integer.valueOf(i));
        if (ingestionTimestamps == null) {
            return 0L;
        }
        return getIngestionDelayMs(ingestionTimestamps._ingestionTimeMs);
    }

    public long getPartitionEndToEndIngestionDelayMs(int i) {
        IngestionTimestamps ingestionTimestamps = this._partitionToIngestionTimestampsMap.get(Integer.valueOf(i));
        if (ingestionTimestamps == null) {
            return 0L;
        }
        return getIngestionDelayMs(ingestionTimestamps._firstStreamIngestionTimeMs);
    }

    public void shutdown() {
        this._timer.cancel();
        if (this._isServerReadyToServeQueries.get().booleanValue()) {
            Iterator<Map.Entry<Integer, IngestionTimestamps>> it2 = this._partitionToIngestionTimestampsMap.entrySet().iterator();
            while (it2.hasNext()) {
                removePartitionId(it2.next().getKey().intValue());
            }
        }
    }
}
