package org.apache.pinot.controller.helix.core.realtime;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.class */
public class SegmentCompletionManager {
    public static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentCompletionManager.class);
    private final HelixManager _helixManager;
    private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap();
    private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap();
    private final PinotLLCRealtimeSegmentManager _segmentManager;
    private final ControllerMetrics _controllerMetrics;
    private final LeadControllerManager _leadControllerManager;
    private final Lock[] _fsmLocks;
    private static final int NUM_FSM_LOCKS = 20;
    private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager$SegmentCompletionFSM.class */
    public static class SegmentCompletionFSM {
        private static final long MAX_TIME_TO_PICK_WINNER_MS = 3300;
        private static final long MAX_TIME_TO_NOTIFY_WINNER_MS = 6600;
        public final Logger _logger;
        State _state;
        final long _startTimeMs;
        private final LLCSegmentName _segmentName;
        private final String _rawTableName;
        private final String _realtimeTableName;
        private final int _numReplicas;
        private final Set<String> _excludedServerStateMap;
        private final Map<String, StreamPartitionMsgOffset> _commitStateMap;
        private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
        private StreamPartitionMsgOffset _winningOffset;
        private String _winner;
        private final PinotLLCRealtimeSegmentManager _segmentManager;
        private final SegmentCompletionManager _segmentCompletionManager;
        private final long _maxTimeToPickWinnerMs;
        private final long _maxTimeToNotifyWinnerMs;
        private final long _initialCommitTimeMs;
        private long _maxTimeAllowedToCommitMs;
        private final boolean _isSplitCommitEnabled;
        private final String _controllerVipUrl;

        public static SegmentCompletionFSM fsmInHolding(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            return new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
        }

        public static SegmentCompletionFSM fsmInCommit(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            return new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i, streamPartitionMsgOffset);
        }

        public static SegmentCompletionFSM fsmStoppedConsuming(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            SegmentCompletionFSM segmentCompletionFSM = new SegmentCompletionFSM(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
            segmentCompletionFSM._state = State.PARTIAL_CONSUMING;
            return segmentCompletionFSM;
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i) {
            this._state = State.HOLDING;
            this._winningOffset = null;
            this._segmentName = lLCSegmentName;
            this._rawTableName = this._segmentName.getTableName();
            this._realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(this._rawTableName);
            this._numReplicas = i;
            this._segmentManager = pinotLLCRealtimeSegmentManager;
            this._commitStateMap = new HashMap(this._numReplicas);
            this._excludedServerStateMap = new HashSet(this._numReplicas);
            this._segmentCompletionManager = segmentCompletionManager;
            this._startTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            this._maxTimeToPickWinnerMs = this._startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
            this._maxTimeToNotifyWinnerMs = this._startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
            this._streamPartitionMsgOffsetFactory = this._segmentCompletionManager.getStreamPartitionMsgOffsetFactory(this._segmentName);
            long commitTimeoutMS = MAX_TIME_TO_NOTIFY_WINNER_MS + this._segmentManager.getCommitTimeoutMS(this._realtimeTableName);
            Long l = this._segmentCompletionManager._commitTimeMap.get(this._rawTableName);
            if (l != null && l.longValue() > commitTimeoutMS) {
                commitTimeoutMS = l.longValue();
            }
            this._logger = LoggerFactory.getLogger("SegmentCompletionFSM_" + lLCSegmentName.getSegmentName());
            if (commitTimeoutMS > 1800000) {
                this._logger.info("Configured max commit time {}s too high for table {}, changing to {}s", Long.valueOf(commitTimeoutMS / 1000), this._realtimeTableName, 1800);
                commitTimeoutMS = 1800000;
            }
            this._initialCommitTimeMs = commitTimeoutMS;
            this._maxTimeAllowedToCommitMs = this._startTimeMs + this._initialCommitTimeMs;
            this._isSplitCommitEnabled = segmentCompletionManager.isSplitCommitEnabled();
            this._controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName lLCSegmentName, int i, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this(pinotLLCRealtimeSegmentManager, segmentCompletionManager, lLCSegmentName, i);
            this._state = State.COMMITTED;
            this._winningOffset = streamPartitionMsgOffset;
            this._winner = "UNKNOWN";
        }

        public String toString() {
            String segmentName = this._segmentName.getSegmentName();
            State state = this._state;
            long j = this._startTimeMs;
            String str = this._winner;
            StreamPartitionMsgOffset streamPartitionMsgOffset = this._winningOffset;
            boolean z = this._isSplitCommitEnabled;
            String str2 = this._controllerVipUrl;
            return "{" + segmentName + "," + state + "," + j + "," + segmentName + "," + str + "," + streamPartitionMsgOffset + "," + z + "}";
        }

        public boolean isDone() {
            return this._state.equals(State.COMMITTED) || this._state.equals(State.ABORTED);
        }

        public SegmentCompletionProtocol.Response segmentConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            synchronized (this) {
                this._logger.info("Processing segmentConsumed({}, {})", str, streamPartitionMsgOffset);
                if (this._excludedServerStateMap.contains(str)) {
                    this._logger.info("Marking instance {} alive again", str);
                    this._excludedServerStateMap.remove(str);
                }
                this._commitStateMap.put(str, streamPartitionMsgOffset);
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return partialConsumingConsumed(str, streamPartitionMsgOffset, currentTimeMs, str2);
                    case HOLDING:
                        return holdingConsumed(str, streamPartitionMsgOffset, currentTimeMs, str2);
                    case COMMITTER_DECIDED:
                        return committerDecidedConsumed(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTER_NOTIFIED:
                        return committerNotifiedConsumed(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTER_UPLOADING:
                        return committerUploadingConsumed(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTING:
                        return committingConsumed(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTED:
                        return committedConsumed(str, streamPartitionMsgOffset);
                    case ABORTED:
                        return hold(str, streamPartitionMsgOffset);
                    default:
                        return fail(str, streamPartitionMsgOffset);
                }
            }
        }

        public SegmentCompletionProtocol.Response segmentCommitStart(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            if (this._excludedServerStateMap.contains(str)) {
                this._logger.warn("Not accepting commit from {} since it had stoppd consuming", str);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            synchronized (this) {
                this._logger.info("Processing segmentCommitStart({}, {})", str, streamPartitionMsgOffset);
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return partialConsumingCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case HOLDING:
                        return holdingCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTER_DECIDED:
                        return committerDecidedCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTER_NOTIFIED:
                        return committerNotifiedCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTER_UPLOADING:
                        return committerUploadingCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTING:
                        return committingCommit(str, streamPartitionMsgOffset, currentTimeMs);
                    case COMMITTED:
                        return committedCommit(str, streamPartitionMsgOffset);
                    case ABORTED:
                        return hold(str, streamPartitionMsgOffset);
                    default:
                        return fail(str, streamPartitionMsgOffset);
                }
            }
        }

        public SegmentCompletionProtocol.Response stoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            synchronized (this) {
                this._logger.info("Processing stoppedConsuming({}, {})", str, streamPartitionMsgOffset);
                this._excludedServerStateMap.add(str);
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                        return partialConsumingStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case HOLDING:
                        return holdingStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case COMMITTER_DECIDED:
                        return committerDecidedStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case COMMITTER_NOTIFIED:
                        return committerNotifiedStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case COMMITTER_UPLOADING:
                        return committerUploadingStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case COMMITTING:
                        return committingStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case COMMITTED:
                        return committedStoppedConsuming(str, streamPartitionMsgOffset, str2);
                    case ABORTED:
                        this._logger.info("Ignoring StoppedConsuming message from {} in state {}", str, this._state);
                        return SegmentCompletionProtocol.RESP_PROCESSED;
                    default:
                        return fail(str, streamPartitionMsgOffset);
                }
            }
        }

        public SegmentCompletionProtocol.Response extendBuildTime(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, int i) {
            long currentTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            synchronized (this) {
                this._logger.info("Processing extendBuildTime({}, {}, {})", str, streamPartitionMsgOffset, Integer.valueOf(i));
                switch (this._state) {
                    case PARTIAL_CONSUMING:
                    case HOLDING:
                    case COMMITTER_DECIDED:
                        return fail(str, streamPartitionMsgOffset);
                    case COMMITTER_NOTIFIED:
                        return committerNotifiedExtendBuildTime(str, streamPartitionMsgOffset, i, currentTimeMs);
                    case COMMITTER_UPLOADING:
                    case COMMITTING:
                    case COMMITTED:
                    case ABORTED:
                    default:
                        return fail(str, streamPartitionMsgOffset);
                }
            }
        }

        public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params, boolean z, boolean z2, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = params.getInstanceId();
            StreamPartitionMsgOffset create = this._streamPartitionMsgOffsetFactory.create(params.getStreamPartitionMsgOffset());
            synchronized (this) {
                if (this._excludedServerStateMap.contains(instanceId)) {
                    this._logger.warn("Not accepting commitEnd from {} since it had stoppd consuming", instanceId);
                    return abortAndReturnFailed();
                }
                this._logger.info("Processing segmentCommitEnd({}, {})", instanceId, create);
                if (!this._state.equals(State.COMMITTER_UPLOADING) || !instanceId.equals(this._winner) || create.compareTo(this._winningOffset) != 0) {
                    this._logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", this._state, this._segmentName.getSegmentName(), this._winner, this._winningOffset);
                    return abortAndReturnFailed();
                }
                if (!z) {
                    this._logger.error("Segment upload failed");
                    return abortAndReturnFailed();
                }
                SegmentCompletionProtocol.Response commitSegment = commitSegment(params, z2, committingSegmentDescriptor);
                if (commitSegment.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
                    return commitSegment;
                }
                return abortAndReturnFailed();
            }
        }

        private SegmentCompletionProtocol.Response fail(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._logger.info("{}:FAIL for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response commit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            long j = (this._maxTimeAllowedToCommitMs - this._startTimeMs) / 1000;
            this._logger.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", this._state, str, streamPartitionMsgOffset, Long.valueOf(j));
            SegmentCompletionProtocol.Response.Params withSplitCommit = new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(streamPartitionMsgOffset.toString()).withBuildTimeSeconds(j).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT).withSplitCommit(this._isSplitCommitEnabled);
            if (this._isSplitCommitEnabled) {
                withSplitCommit.withControllerVipUrl(this._controllerVipUrl);
            }
            return new SegmentCompletionProtocol.Response(withSplitCommit);
        }

        private SegmentCompletionProtocol.Response discard(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._logger.warn("{}:DISCARD for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            return SegmentCompletionProtocol.RESP_DISCARD;
        }

        private SegmentCompletionProtocol.Response keep(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._logger.info("{}:KEEP for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(streamPartitionMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
        }

        private SegmentCompletionProtocol.Response catchup(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._logger.info("{}:CATCHUP for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(this._winningOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
        }

        private SegmentCompletionProtocol.Response hold(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._logger.info("{}:HOLD for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(streamPartitionMsgOffset.toString()));
        }

        private SegmentCompletionProtocol.Response abortAndReturnHold(long j, String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return hold(str, streamPartitionMsgOffset);
        }

        private SegmentCompletionProtocol.Response abortAndReturnFailed() {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long j, String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            if (j <= this._maxTimeAllowedToCommitMs) {
                return null;
            }
            this._logger.warn("{}:Aborting FSM (too late) instance={} offset={} now={} start={}", this._state, str, streamPartitionMsgOffset, Long.valueOf(j), Long.valueOf(this._startTimeMs));
            return abortAndReturnHold(j, str, streamPartitionMsgOffset);
        }

        private int numReplicasToLookFor() {
            return this._numReplicas - this._excludedServerStateMap.size();
        }

        private SegmentCompletionProtocol.Response partialConsumingConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j, String str2) {
            this._state = State.HOLDING;
            return holdingConsumed(str, streamPartitionMsgOffset, j, str2);
        }

        private SegmentCompletionProtocol.Response partialConsumingCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processCommitWhileHoldingOrPartialConsuming(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response partialConsumingStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, true);
        }

        private SegmentCompletionProtocol.Response holdingConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j, String str2) {
            SegmentCompletionProtocol.Response hold;
            if (!isWinnerPicked(str, j, str2)) {
                hold = hold(str, streamPartitionMsgOffset);
            } else if (this._winner.equals(str)) {
                this._logger.info("{}:Committer notified winner instance={} offset={}", this._state, str, streamPartitionMsgOffset);
                hold = commit(str, streamPartitionMsgOffset);
                this._state = State.COMMITTER_NOTIFIED;
            } else {
                this._logger.info("{}:Committer decided winner={} offset={}", this._state, this._winner, this._winningOffset);
                hold = catchup(str, streamPartitionMsgOffset);
                this._state = State.COMMITTER_DECIDED;
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response holdingCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processCommitWhileHoldingOrPartialConsuming(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response holdingStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, true);
        }

        private SegmentCompletionProtocol.Response committerDecidedConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            SegmentCompletionProtocol.Response hold;
            if (streamPartitionMsgOffset.compareTo(this._winningOffset) > 0) {
                this._logger.warn("{}:Aborting FSM (offset larger than winning) instance={} offset={} now={} winning={}", this._state, str, streamPartitionMsgOffset, Long.valueOf(j), this._winningOffset);
                return abortAndReturnHold(j, str, streamPartitionMsgOffset);
            }
            if (!this._winner.equals(str)) {
                hold = streamPartitionMsgOffset.compareTo(this._winningOffset) == 0 ? hold(str, streamPartitionMsgOffset) : catchup(str, streamPartitionMsgOffset);
            } else if (this._winningOffset.compareTo(streamPartitionMsgOffset) == 0) {
                this._logger.info("{}:Notifying winner instance={} offset={}", this._state, str, streamPartitionMsgOffset);
                hold = commit(str, streamPartitionMsgOffset);
                this._state = State.COMMITTER_NOTIFIED;
            } else {
                this._logger.warn("{}:Winner coming back with different offset for instance={} offset={} prevWinnOffset={}", this._state, str, streamPartitionMsgOffset, this._winningOffset);
                hold = abortAndReturnHold(j, str, streamPartitionMsgOffset);
            }
            if (j > this._maxTimeToNotifyWinnerMs) {
                hold = abortAndReturnHold(j, str, streamPartitionMsgOffset);
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response committerDecidedCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processCommitWhileHoldingOrPartialConsuming(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response committerDecidedStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, false);
        }

        private SegmentCompletionProtocol.Response committerNotifiedConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            SegmentCompletionProtocol.Response hold;
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset);
            if (abortIfTooLateAndReturnHold != null) {
                return abortIfTooLateAndReturnHold;
            }
            if (!str.equals(this._winner)) {
                hold = streamPartitionMsgOffset.compareTo(this._winningOffset) == 0 ? hold(str, streamPartitionMsgOffset) : streamPartitionMsgOffset.compareTo(this._winningOffset) < 0 ? catchup(str, streamPartitionMsgOffset) : hold(str, streamPartitionMsgOffset);
            } else if (streamPartitionMsgOffset.compareTo(this._winningOffset) == 0) {
                hold = commit(str, streamPartitionMsgOffset);
            } else {
                hold = discard(str, streamPartitionMsgOffset);
                this._logger.warn("{}:Aborting for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
                this._state = State.ABORTED;
            }
            return hold;
        }

        private SegmentCompletionProtocol.Response committerNotifiedCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            SegmentCompletionProtocol.Response checkBadCommitRequest = checkBadCommitRequest(str, streamPartitionMsgOffset, j);
            if (checkBadCommitRequest != null) {
                return checkBadCommitRequest;
            }
            this._logger.info("{}:Uploading for instance={} offset={}", this._state, str, streamPartitionMsgOffset);
            this._state = State.COMMITTER_UPLOADING;
            long j2 = j - this._startTimeMs;
            if (j2 > this._initialCommitTimeMs) {
                this._segmentCompletionManager._commitTimeMap.put(this._segmentName.getTableName(), Long.valueOf(j2));
            }
            return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
        }

        private SegmentCompletionProtocol.Response committerNotifiedStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, false);
        }

        private SegmentCompletionProtocol.Response committerNotifiedExtendBuildTime(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, int i, long j) {
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset);
            if (abortIfTooLateAndReturnHold == null) {
                long j2 = j + (i * 1000);
                if (j2 > this._startTimeMs + 1800000) {
                    this._logger.warn("Not accepting lease extension from {} startTime={} requestedTime={}", str, Long.valueOf(this._startTimeMs), Long.valueOf(j2));
                    return abortAndReturnFailed();
                }
                this._maxTimeAllowedToCommitMs = j2;
                abortIfTooLateAndReturnHold = SegmentCompletionProtocol.RESP_PROCESSED;
            }
            return abortIfTooLateAndReturnHold;
        }

        private SegmentCompletionProtocol.Response committerUploadingConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processConsumedAfterCommitStart(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response committerUploadingCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processCommitWhileUploading(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response committerUploadingStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, false);
        }

        private SegmentCompletionProtocol.Response committingConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processConsumedAfterCommitStart(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response committingCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            return processCommitWhileUploading(str, streamPartitionMsgOffset, j);
        }

        private SegmentCompletionProtocol.Response committingStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, false);
        }

        private SegmentCompletionProtocol.Response committedConsumed(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            return streamPartitionMsgOffset.compareTo(this._winningOffset) == 0 ? keep(str, streamPartitionMsgOffset) : discard(str, streamPartitionMsgOffset);
        }

        private SegmentCompletionProtocol.Response committedCommit(String str, StreamPartitionMsgOffset streamPartitionMsgOffset) {
            return streamPartitionMsgOffset.compareTo(this._winningOffset) == 0 ? keep(str, streamPartitionMsgOffset) : discard(str, streamPartitionMsgOffset);
        }

        private SegmentCompletionProtocol.Response committedStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2) {
            return processStoppedConsuming(str, streamPartitionMsgOffset, str2, false);
        }

        private SegmentCompletionProtocol.Response processStoppedConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, String str2, boolean z) {
            this._logger.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", str, this._segmentName, streamPartitionMsgOffset, this._state, Boolean.valueOf(z), str2);
            try {
                this._segmentManager.segmentStoppedConsuming(this._segmentName, str);
                return SegmentCompletionProtocol.RESP_PROCESSED;
            } catch (Exception e) {
                this._logger.error("Caught exception while processing stopped CONSUMING segment: {} on instance: {}", this._segmentName.getSegmentName(), str, e);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
        }

        private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            if (abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset) != null) {
                return null;
            }
            if (!str.equals(this._winner)) {
                return streamPartitionMsgOffset.compareTo(this._winningOffset) == 0 ? hold(str, streamPartitionMsgOffset) : streamPartitionMsgOffset.compareTo(this._winningOffset) < 0 ? catchup(str, streamPartitionMsgOffset) : hold(str, streamPartitionMsgOffset);
            }
            SegmentCompletionManager.LOGGER.warn("{}:Aborting FSM because winner is reporting a segment while it is also committing instance={} offset={} now={}", this._state, str, streamPartitionMsgOffset, Long.valueOf(j));
            return abortAndReturnHold(j, str, streamPartitionMsgOffset);
        }

        private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params params, boolean z, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = params.getInstanceId();
            StreamPartitionMsgOffset create = this._streamPartitionMsgOffsetFactory.create(params.getStreamPartitionMsgOffset());
            if (!this._state.equals(State.COMMITTER_UPLOADING)) {
                this._logger.warn("State change during upload: state={} segment={} winner={} winningOffset={}", this._state, this._segmentName.getSegmentName(), this._winner, this._winningOffset);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            this._logger.info("Committing segment {} at offset {} winner {}", this._segmentName.getSegmentName(), create, instanceId);
            this._state = State.COMMITTING;
            if (z) {
                try {
                    this._segmentManager.commitSegmentFile(this._realtimeTableName, committingSegmentDescriptor);
                } catch (Exception e) {
                    this._logger.error("Caught exception while committing segment file for segment: {}", this._segmentName.getSegmentName(), e);
                    return SegmentCompletionProtocol.RESP_FAILED;
                }
            }
            try {
                if ("file".equalsIgnoreCase(URIUtils.getUri(committingSegmentDescriptor.getSegmentLocation()).getScheme())) {
                    committingSegmentDescriptor.setSegmentLocation(URIUtils.constructDownloadUrl(this._controllerVipUrl, TableNameBuilder.extractRawTableName(this._realtimeTableName), this._segmentName.getSegmentName()));
                }
                this._segmentManager.commitSegmentMetadata(this._realtimeTableName, committingSegmentDescriptor);
                this._state = State.COMMITTED;
                this._logger.info("Committed segment {} at offset {} winner {}", this._segmentName.getSegmentName(), create, instanceId);
                return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
            } catch (Exception e2) {
                this._logger.error("Caught exception while committing segment metadata for segment: {}", this._segmentName.getSegmentName(), e2);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
        }

        private SegmentCompletionProtocol.Response processCommitWhileUploading(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            this._logger.info("Processing segmentCommit({}, {})", str, streamPartitionMsgOffset);
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset);
            return abortIfTooLateAndReturnHold != null ? abortIfTooLateAndReturnHold : new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(streamPartitionMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        }

        private SegmentCompletionProtocol.Response checkBadCommitRequest(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset);
            if (abortIfTooLateAndReturnHold != null) {
                return abortIfTooLateAndReturnHold;
            }
            if (!str.equals(this._winner) || streamPartitionMsgOffset.compareTo(this._winningOffset) == 0) {
                return null;
            }
            this._logger.warn("{}:Aborting FSM (bad commit req) instance={} offset={} now={} winning={}", this._state, str, streamPartitionMsgOffset, Long.valueOf(j), this._winningOffset);
            return abortAndReturnHold(j, str, streamPartitionMsgOffset);
        }

        private SegmentCompletionProtocol.Response processCommitWhileHoldingOrPartialConsuming(String str, StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
            this._logger.info("Processing segmentCommit({}, {})", str, streamPartitionMsgOffset);
            SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold = abortIfTooLateAndReturnHold(j, str, streamPartitionMsgOffset);
            return abortIfTooLateAndReturnHold != null ? abortIfTooLateAndReturnHold : hold(str, streamPartitionMsgOffset);
        }

        private boolean isWinnerPicked(String str, long j, String str2) {
            if ((SegmentCompletionProtocol.REASON_ROW_LIMIT.equals(str2) || SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP.equals(str2)) && this._commitStateMap.size() == 1) {
                this._winner = str;
                this._winningOffset = this._commitStateMap.get(str);
                return true;
            }
            if (j <= this._maxTimeToPickWinnerMs && this._commitStateMap.size() != numReplicasToLookFor()) {
                return false;
            }
            this._logger.info("{}:Picking winner time={} size={}", this._state, Long.valueOf(j - this._startTimeMs), Integer.valueOf(this._commitStateMap.size()));
            StreamPartitionMsgOffset streamPartitionMsgOffset = null;
            String str3 = null;
            for (Map.Entry<String, StreamPartitionMsgOffset> entry : this._commitStateMap.entrySet()) {
                if (streamPartitionMsgOffset == null || entry.getValue().compareTo(streamPartitionMsgOffset) > 0) {
                    streamPartitionMsgOffset = entry.getValue();
                    str3 = entry.getKey();
                }
            }
            this._winningOffset = streamPartitionMsgOffset;
            if (this._commitStateMap.get(str).compareTo(streamPartitionMsgOffset) == 0) {
                str3 = str;
            }
            this._winner = str3;
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager$State.class */
    public enum State {
        PARTIAL_CONSUMING,
        HOLDING,
        COMMITTER_DECIDED,
        COMMITTER_NOTIFIED,
        COMMITTER_UPLOADING,
        COMMITTING,
        COMMITTED,
        ABORTED
    }

    public static int getMaxCommitTimeForAllSegmentsSeconds() {
        return 1800;
    }

    public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, int i) {
        this._helixManager = helixManager;
        this._segmentManager = pinotLLCRealtimeSegmentManager;
        this._controllerMetrics = controllerMetrics;
        this._leadControllerManager = leadControllerManager;
        SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(i, TimeUnit.SECONDS));
        this._fsmLocks = new Lock[20];
        for (int i2 = 0; i2 < 20; i2++) {
            this._fsmLocks[i2] = new ReentrantLock();
        }
    }

    public boolean isSplitCommitEnabled() {
        return this._segmentManager.getIsSplitCommitEnabled();
    }

    public String getControllerVipUrl() {
        return this._segmentManager.getControllerVipUrl();
    }

    protected long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLCSegmentName lLCSegmentName) {
        TableConfig tableConfig = this._segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName()));
        return StreamConsumerFactoryProvider.create(new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig))).createStreamMsgOffsetFactory();
    }

    private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName lLCSegmentName, String str) {
        String segmentName = lLCSegmentName.getSegmentName();
        Lock lock = this._fsmLocks[(segmentName.hashCode() & Integer.MAX_VALUE) % 20];
        try {
            try {
                lock.lock();
                SegmentCompletionFSM segmentCompletionFSM = this._fsmMap.get(segmentName);
                if (segmentCompletionFSM == null) {
                    SegmentZKMetadata segmentZKMetadata = this._segmentManager.getSegmentZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(lLCSegmentName.getTableName()), lLCSegmentName.getSegmentName(), null);
                    segmentCompletionFSM = segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE ? SegmentCompletionFSM.fsmInCommit(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas(), getStreamPartitionMsgOffsetFactory(lLCSegmentName).create(segmentZKMetadata.getEndOffset())) : str.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING) ? SegmentCompletionFSM.fsmStoppedConsuming(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas()) : SegmentCompletionFSM.fsmInHolding(this._segmentManager, this, lLCSegmentName, segmentZKMetadata.getNumReplicas());
                    LOGGER.info("Created FSM {}", segmentCompletionFSM);
                    this._fsmMap.put(segmentName, segmentCompletionFSM);
                }
                return segmentCompletionFSM;
            } catch (Exception e) {
                LOGGER.error("Exception getting FSM for segment {}", segmentName, e);
                throw new RuntimeException("Exception getting FSM for segment " + segmentName, e);
            }
        } finally {
            lock.unlock();
        }
    }

    public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        if (!isLeader(lLCSegmentName.getTableName()) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        String reason = params.getReason();
        StreamPartitionMsgOffset create = getStreamPartitionMsgOffsetFactory(lLCSegmentName).create(params.getStreamPartitionMsgOffset());
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        SegmentCompletionFSM segmentCompletionFSM = null;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_CONSUMED);
            response = segmentCompletionFSM.segmentConsumed(instanceId, create, reason);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentConsumed for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        if (!isLeader(lLCSegmentName.getTableName()) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        StreamPartitionMsgOffset create = getStreamPartitionMsgOffsetFactory(lLCSegmentName).create(params.getStreamPartitionMsgOffset());
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.segmentCommitStart(instanceId, create);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitStart for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response extendBuildTime(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        if (!isLeader(lLCSegmentName.getTableName()) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        StreamPartitionMsgOffset create = getStreamPartitionMsgOffsetFactory(lLCSegmentName).create(params.getStreamPartitionMsgOffset());
        int extraTimeSec = params.getExtraTimeSec();
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.extendBuildTime(instanceId, create, extraTimeSec);
        } catch (Exception e) {
            LOGGER.error("Caught exception in extendBuildTime for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params params) {
        String segmentName = params.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        if (!isLeader(lLCSegmentName.getTableName()) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = params.getInstanceId();
        StreamPartitionMsgOffset create = getStreamPartitionMsgOffsetFactory(lLCSegmentName).create(params.getStreamPartitionMsgOffset());
        String reason = params.getReason();
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING);
            response = segmentCompletionFSM.stoppedConsuming(instanceId, create, reason);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentStoppedConsuming for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params params, boolean z, boolean z2, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentName = params.getSegmentName();
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentName);
        if (!isLeader(lLCSegmentName.getTableName()) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        SegmentCompletionFSM segmentCompletionFSM = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            segmentCompletionFSM = lookupOrCreateFsm(lLCSegmentName, SegmentCompletionProtocol.MSG_TYPE_COMMIT);
            response = segmentCompletionFSM.segmentCommitEnd(params, z, z2, committingSegmentDescriptor);
        } catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitEnd for segment {}", segmentName, e);
        }
        if (segmentCompletionFSM != null && segmentCompletionFSM.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", segmentCompletionFSM.toString());
            this._fsmMap.remove(segmentName);
        }
        return response;
    }

    @VisibleForTesting
    protected boolean isLeader(String str) {
        return this._leadControllerManager.isLeaderForTable(str);
    }
}
