package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator.class */
public class ConsumerCoordinator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerCoordinator.class);
    private static final long WAIT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(3);
    private final boolean _enforceConsumptionInOrder;
    private final RealtimeTableDataManager _realtimeTableDataManager;
    private final boolean _useIdealStateToCalculatePreviousSegment;
    private final ServerMetrics _serverMetrics;
    private final Semaphore _semaphore = new Semaphore(1);
    private final Lock _lock = new ReentrantLock();
    private final Condition _condition = this._lock.newCondition();
    private final AtomicBoolean _firstTransitionProcessed = new AtomicBoolean(false);
    private volatile int _maxSequenceNumberRegistered = -1;

    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/ConsumerCoordinator$ShouldNotConsumeException.class */
    public static class ShouldNotConsumeException extends Exception {
        public ShouldNotConsumeException(String str) {
            super(str);
        }
    }

    public ConsumerCoordinator(boolean z, RealtimeTableDataManager realtimeTableDataManager) {
        this._enforceConsumptionInOrder = z;
        this._realtimeTableDataManager = realtimeTableDataManager;
        StreamIngestionConfig streamIngestionConfig = realtimeTableDataManager.getStreamIngestionConfig();
        this._useIdealStateToCalculatePreviousSegment = streamIngestionConfig != null && streamIngestionConfig.isUseIdealStateToCalculatePreviousSegment();
        this._serverMetrics = ServerMetrics.get();
    }

    public void acquire(LLCSegmentName lLCSegmentName) throws InterruptedException, ShouldNotConsumeException {
        String segmentName = lLCSegmentName.getSegmentName();
        if (this._enforceConsumptionInOrder) {
            long currentTimeMillis = System.currentTimeMillis();
            waitForPreviousSegment(lLCSegmentName);
            this._serverMetrics.addTimedTableValue(this._realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_WAIT_TIME_MS, System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!this._semaphore.tryAcquire(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
            LOGGER.warn("Failed to acquire consumer semaphore for segment: {} in: {}ms. Retrying.", segmentName, Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            checkSegmentStatus(segmentName);
        }
    }

    public void release() {
        this._semaphore.release();
    }

    @VisibleForTesting
    Semaphore getSemaphore() {
        return this._semaphore;
    }

    public void register(LLCSegmentName lLCSegmentName) {
        if (this._enforceConsumptionInOrder) {
            this._lock.lock();
            try {
                int sequenceNumber = lLCSegmentName.getSequenceNumber();
                Logger logger = LOGGER;
                Object[] objArr = new Object[5];
                objArr[0] = lLCSegmentName.getSegmentName();
                objArr[1] = Integer.valueOf(sequenceNumber);
                objArr[2] = Integer.valueOf(this._maxSequenceNumberRegistered);
                objArr[3] = Boolean.valueOf(this._firstTransitionProcessed.get());
                objArr[4] = Boolean.valueOf(sequenceNumber - this._maxSequenceNumberRegistered > 1);
                logger.info("Registering segment: {} with sequence number: {}. maxSequenceNumberRegistered: {}, firstTransitionProcessed: {}, Difference in sequence more than one: {}", objArr);
                if (sequenceNumber > this._maxSequenceNumberRegistered) {
                    this._maxSequenceNumberRegistered = sequenceNumber;
                    this._condition.signalAll();
                }
            } finally {
                this._lock.unlock();
            }
        }
    }

    private void waitForPreviousSegment(LLCSegmentName lLCSegmentName) throws InterruptedException, ShouldNotConsumeException {
        if (this._firstTransitionProcessed.get() && !this._useIdealStateToCalculatePreviousSegment) {
            waitForPreviousSegment(lLCSegmentName, lLCSegmentName.getSequenceNumber() - 1);
            return;
        }
        if (this._maxSequenceNumberRegistered < lLCSegmentName.getSequenceNumber() - 1) {
            waitForPreviousSegment(lLCSegmentName, getPreviousSegmentSequenceNumberFromIdealState(lLCSegmentName));
        }
        this._firstTransitionProcessed.set(true);
    }

    @VisibleForTesting
    void waitForPreviousSegment(LLCSegmentName lLCSegmentName, int i) throws InterruptedException, ShouldNotConsumeException {
        if (i <= this._maxSequenceNumberRegistered) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this._lock.lock();
        while (i > this._maxSequenceNumberRegistered) {
            try {
                if (!this._condition.await(WAIT_INTERVAL_MS, TimeUnit.MILLISECONDS)) {
                    String segmentName = lLCSegmentName.getSegmentName();
                    checkSegmentStatus(segmentName);
                    LOGGER.warn("Waited on previous segment with sequence number: {} for: {}ms. Refreshing the previous segment sequence number for current segment: {}", new Object[]{Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), segmentName});
                    i = getPreviousSegmentSequenceNumberFromIdealState(lLCSegmentName);
                }
            } finally {
                this._lock.unlock();
            }
        }
    }

    @VisibleForTesting
    int getPreviousSegmentSequenceNumberFromIdealState(LLCSegmentName lLCSegmentName) {
        LLCSegmentName of;
        int sequenceNumber;
        long currentTimeMillis = System.currentTimeMillis();
        int i = -1;
        String serverInstance = this._realtimeTableDataManager.getServerInstance();
        int partitionGroupId = lLCSegmentName.getPartitionGroupId();
        int sequenceNumber2 = lLCSegmentName.getSequenceNumber();
        for (Map.Entry<String, Map<String, String>> entry : getSegmentAssignment().entrySet()) {
            if ("ONLINE".equals(entry.getValue().get(serverInstance)) && (of = LLCSegmentName.of(entry.getKey())) != null && of.getPartitionGroupId() == partitionGroupId && (sequenceNumber = of.getSequenceNumber()) > i && sequenceNumber < sequenceNumber2) {
                i = sequenceNumber;
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LOGGER.info("Fetched previous segment sequence number: {} to current segment: {} in: {}ms.", new Object[]{Integer.valueOf(i), lLCSegmentName.getSegmentName(), Long.valueOf(currentTimeMillis2)});
        this._serverMetrics.addTimedTableValue(this._realtimeTableDataManager.getTableName(), ServerTimer.PREV_SEGMENT_FETCH_IDEAL_STATE_TIME_MS, currentTimeMillis2, TimeUnit.MILLISECONDS);
        return i;
    }

    @VisibleForTesting
    Map<String, Map<String, String>> getSegmentAssignment() {
        String tableName = this._realtimeTableDataManager.getTableName();
        IdealState tableIdealState = HelixHelper.getTableIdealState(this._realtimeTableDataManager.getHelixManager(), tableName);
        Preconditions.checkState(tableIdealState != null, "Failed to find ideal state for table: %s", tableName);
        return tableIdealState.getRecord().getMapFields();
    }

    @VisibleForTesting
    Lock getLock() {
        return this._lock;
    }

    @VisibleForTesting
    AtomicBoolean getFirstTransitionProcessed() {
        return this._firstTransitionProcessed;
    }

    @VisibleForTesting
    int getMaxSequenceNumberRegistered() {
        return this._maxSequenceNumberRegistered;
    }

    @VisibleForTesting
    void checkSegmentStatus(String str) throws ShouldNotConsumeException {
        SegmentZKMetadata fetchZKMetadataNullable = this._realtimeTableDataManager.fetchZKMetadataNullable(str);
        if (fetchZKMetadataNullable == null) {
            throw new ShouldNotConsumeException("Segment: " + str + " is deleted");
        }
        if (fetchZKMetadataNullable.getStatus().isCompleted()) {
            throw new ShouldNotConsumeException("Segment: " + str + " is already completed with status: " + String.valueOf(fetchZKMetadataNullable.getStatus()));
        }
    }
}
