package org.apache.pinot.server.starter.helix;

import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;

/* loaded from: input_file:org/apache/pinot/server/starter/helix/FreshnessBasedConsumptionStatusChecker.class */
public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsumptionStatusChecker {
    private final long _minFreshnessMs;
    private final long _idleTimeoutMs;

    public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> set, long j, long j2) {
        super(instanceDataManager, set);
        this._minFreshnessMs = j;
        this._idleTimeoutMs = j2;
    }

    private boolean isOffsetCaughtUp(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2) {
        return (streamPartitionMsgOffset == null || streamPartitionMsgOffset2 == null || streamPartitionMsgOffset.compareTo(streamPartitionMsgOffset2) < 0) ? false : true;
    }

    private boolean segmentHasBeenIdleLongerThanThreshold(long j) {
        return this._idleTimeoutMs > 0 && j > this._idleTimeoutMs;
    }

    protected long now() {
        return System.currentTimeMillis();
    }

    @Override // org.apache.pinot.server.starter.helix.IngestionBasedConsumptionStatusChecker
    protected boolean isSegmentCaughtUp(String str, LLRealtimeSegmentDataManager lLRealtimeSegmentDataManager) {
        long now = now();
        long latestIngestionTimestamp = lLRealtimeSegmentDataManager.getSegment().getSegmentMetadata().getLatestIngestionTimestamp();
        long j = now - latestIngestionTimestamp;
        if (latestIngestionTimestamp >= 0 && j <= this._minFreshnessMs) {
            this._logger.info("Segment {} with freshness {}ms has caught up within min freshness {}", str, Long.valueOf(j), Long.valueOf(this._minFreshnessMs));
            return true;
        }
        StreamPartitionMsgOffset currentOffset = lLRealtimeSegmentDataManager.getCurrentOffset();
        StreamPartitionMsgOffset fetchLatestStreamOffset = lLRealtimeSegmentDataManager.fetchLatestStreamOffset(5000L);
        if (isOffsetCaughtUp(currentOffset, fetchLatestStreamOffset)) {
            this._logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}. But the current ingested offset is equal to the latest available offset {}.", str, Long.valueOf(j), Long.valueOf(this._minFreshnessMs), currentOffset);
            return true;
        }
        StreamPartitionMsgOffset fetchEarliestStreamOffset = lLRealtimeSegmentDataManager.fetchEarliestStreamOffset(5000L);
        long timeSinceEventLastConsumedMs = lLRealtimeSegmentDataManager.getTimeSinceEventLastConsumedMs();
        if (segmentHasBeenIdleLongerThanThreshold(timeSinceEventLastConsumedMs)) {
            this._logger.warn("Segment {} with freshness {}ms has not caught up within min freshness {}. But the current ingested offset {} has been idle for {}ms. At offset {}. Earliest offset {}. Latest offset {}.", str, Long.valueOf(j), Long.valueOf(this._minFreshnessMs), currentOffset, Long.valueOf(timeSinceEventLastConsumedMs), currentOffset, fetchEarliestStreamOffset, fetchLatestStreamOffset);
            return true;
        }
        this._logger.info("Segment {} with freshness {}ms has not caught up within min freshness {}. At offset {}. Earliest offset {}. Latest offset {}.", str, Long.valueOf(j), Long.valueOf(this._minFreshnessMs), currentOffset, fetchEarliestStreamOffset, fetchLatestStreamOffset);
        return false;
    }
}
