package org.apache.pinot.core.data.manager.realtime;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.CompletionConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDataDecoder;
import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
import org.apache.pinot.spi.stream.StreamDataDecoderResult;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.class */
public class RealtimeSegmentDataManager extends SegmentDataManager {
    public static final String RESOURCE_TEMP_DIR_NAME = "_tmp";
    private static final int MINIMUM_CONSUME_TIME_MINUTES = 10;
    private static final long TIME_THRESHOLD_FOR_LOG_MINUTES = 1;
    private static final long TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS = 1;
    private static final int MSG_COUNT_THRESHOLD_FOR_LOG = 100000;
    private static final int BUILD_TIME_LEASE_SECONDS = 30;
    private static final int MAX_CONSECUTIVE_ERROR_COUNT = 5;
    private final SegmentZKMetadata _segmentZKMetadata;
    private final TableConfig _tableConfig;
    private final RealtimeTableDataManager _realtimeTableDataManager;
    private final StreamDataDecoder _streamDataDecoder;
    private final int _segmentMaxRowCount;
    private final String _resourceDataDir;
    private final IndexLoadingConfig _indexLoadingConfig;
    private final Schema _schema;
    private final Semaphore _partitionGroupConsumerSemaphore;
    private final AtomicBoolean _acquiredConsumerSemaphore;
    private final ServerMetrics _serverMetrics;
    private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
    private final BooleanSupplier _isReadyToConsumeData;
    private final MutableSegmentImpl _realtimeSegment;
    private volatile StreamPartitionMsgOffset _currentOffset;
    private volatile State _state;
    private final String _segmentNameStr;
    private final SegmentVersion _segmentVersion;
    private final SegmentBuildTimeLeaseExtender _leaseExtender;
    private SegmentBuildDescriptor _segmentBuildDescriptor;
    private final StreamConsumerFactory _streamConsumerFactory;
    private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
    private volatile StreamPartitionMsgOffset _finalOffset;
    private static final int MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS = 31;
    private Thread _consumerThread;
    private final int _partitionGroupId;
    private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
    final String _clientId;
    private final RecordEnricherPipeline _recordEnricherPipeline;
    private final TransformPipeline _transformPipeline;
    private final File _resourceTmpDir;
    private final String _tableNameWithType;
    private final ColumnIndicesForRealtimeTable _columnIndicesForRealtimeTable;
    private final Logger _segmentLogger;
    private final String _tableStreamName;
    private final PinotDataBufferMemoryManager _memoryManager;
    private final String _instanceId;
    private final ServerSegmentCompletionProtocolHandler _protocolHandler;
    private final long _consumeStartTime;
    private final StreamPartitionMsgOffset _startOffset;
    private final StreamConfig _streamConfig;
    private RowMetadata _lastRowMetadata;
    private final Semaphore _segBuildSemaphore;
    private final boolean _isOffHeap;
    private final boolean _nullHandlingEnabled;
    private final SegmentCommitterFactory _segmentCommitterFactory;
    private final RealtimeConsumptionRateManager.ConsumptionRateLimiter _partitionRateLimiter;
    private final RealtimeConsumptionRateManager.ConsumptionRateLimiter _serverRateLimiter;
    private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
    private final CommonConstants.Segment.Realtime.CompletionMode _segmentCompletionMode;
    private final boolean _allowConsumptionDuringCommit;
    private boolean _trackFilteredMessageOffsets;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile int _numRowsConsumed = 0;
    private volatile int _numRowsIndexed = 0;
    private volatile int _numRowsErrored = 0;
    private volatile int _consecutiveErrorCount = 0;
    private long _startTimeMs = 0;
    private final IdleTimer _idleTimer = new IdleTimer();
    private volatile long _consumeEndTime = 0;
    private volatile boolean _hasMessagesFetched = false;
    private volatile boolean _endOfPartitionGroup = false;
    private volatile boolean _forceCommitMessageReceived = false;
    private volatile boolean _shouldStop = false;
    private PartitionGroupConsumer _partitionGroupConsumer = null;
    private StreamMetadataProvider _partitionMetadataProvider = null;
    private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
    private long _lastConsumedTimestampMs = -1;
    private long _lastLogTime = 0;
    private int _lastConsumedCount = 0;
    private String _stopReason = null;
    private final List<String> _filteredMessageOffsets = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus;

        static {
            try {
                $SwitchMap$org$apache$pinot$spi$utils$CommonConstants$Segment$Realtime$CompletionMode[CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pinot$spi$utils$CommonConstants$Segment$Realtime$CompletionMode[CommonConstants.Segment.Realtime.CompletionMode.DEFAULT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus = new int[SegmentCompletionProtocol.ControllerResponseStatus.values().length];
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.HOLD.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.DISCARD.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.KEEP.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[SegmentCompletionProtocol.ControllerResponseStatus.COMMIT.ordinal()] = 6;
            } catch (NoSuchFieldError e8) {
            }
            $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State = new int[State.values().length];
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.INITIAL_CONSUMING.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.CATCHING_UP.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.CONSUMING_TO_ONLINE.ordinal()] = 3;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.COMMITTED.ordinal()] = 4;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.RETAINED.ordinal()] = 5;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.DISCARDED.ordinal()] = 6;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.ERROR.ordinal()] = 7;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$pinot$core$data$manager$realtime$RealtimeSegmentDataManager$State[State.HOLDING.ordinal()] = 8;
            } catch (NoSuchFieldError e16) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager$ConsumptionStopIndicator.class */
    public static class ConsumptionStopIndicator {
        final StreamPartitionMsgOffset _offset;
        final String _segmentName;
        final String _instanceId;
        final Logger _logger;
        final ServerSegmentCompletionProtocolHandler _protocolHandler;
        final String _reason;

        private ConsumptionStopIndicator(StreamPartitionMsgOffset streamPartitionMsgOffset, String str, String str2, ServerSegmentCompletionProtocolHandler serverSegmentCompletionProtocolHandler, String str3, Logger logger) {
            this._offset = streamPartitionMsgOffset;
            this._segmentName = str;
            this._instanceId = str2;
            this._protocolHandler = serverSegmentCompletionProtocolHandler;
            this._logger = logger;
            this._reason = str3;
        }

        SegmentCompletionProtocol.Response postSegmentStoppedConsuming() {
            SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
            params.withStreamPartitionMsgOffset(this._offset.toString()).withReason(this._reason).withSegmentName(this._segmentName).withInstanceId(this._instanceId);
            SegmentCompletionProtocol.Response segmentStoppedConsuming = this._protocolHandler.segmentStoppedConsuming(params);
            this._logger.info("Got response {}", segmentStoppedConsuming.toJsonString());
            return segmentStoppedConsuming;
        }
    }

    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager$PartitionConsumer.class */
    public class PartitionConsumer implements Runnable {
        public PartitionConsumer() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:13:0x004a, code lost:
        
            if (r9.this$0._partitionUpsertMetadataManager == null) goto L105;
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x0059, code lost:
        
            if (r9.this$0._tableConfig.getUpsertMetadataTTL() <= 0.0d) goto L14;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x005c, code lost:
        
            r9.this$0._partitionUpsertMetadataManager.takeSnapshot();
            r9.this$0._partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
         */
        /* JADX WARN: Code restructure failed: missing block: B:17:0x0077, code lost:
        
            r9.this$0._partitionUpsertMetadataManager.removeExpiredPrimaryKeys();
            r9.this$0._partitionUpsertMetadataManager.takeSnapshot();
         */
        /* JADX WARN: Code restructure failed: missing block: B:20:0x0099, code lost:
        
            if (r9.this$0._state.isFinal() != false) goto L93;
         */
        /* JADX WARN: Code restructure failed: missing block: B:22:0x00a6, code lost:
        
            if (r9.this$0._state.shouldConsume() == false) goto L20;
         */
        /* JADX WARN: Code restructure failed: missing block: B:23:0x00a9, code lost:
        
            r9.this$0.consumeLoop();
         */
        /* JADX WARN: Code restructure failed: missing block: B:24:0x00b1, code lost:
        
            r9.this$0._serverMetrics.setValueOfTableGauge(r9.this$0._clientId, org.apache.pinot.common.metrics.ServerGauge.LLC_PARTITION_CONSUMING, 0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:25:0x00cd, code lost:
        
            if (r9.this$0._shouldStop == false) goto L23;
         */
        /* JADX WARN: Code restructure failed: missing block: B:27:0x00dd, code lost:
        
            if (r9.this$0._state != org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.INITIAL_CONSUMING) goto L26;
         */
        /* JADX WARN: Code restructure failed: missing block: B:28:0x00e0, code lost:
        
            r10 = r9.this$0.now();
            r9.this$0._serverMetrics.setValueOfTableGauge(r9.this$0._clientId, org.apache.pinot.common.metrics.ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS, java.util.concurrent.TimeUnit.MILLISECONDS.toSeconds(r10 - r9.this$0._startTimeMs));
         */
        /* JADX WARN: Code restructure failed: missing block: B:29:0x0145, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.HOLDING;
            r0 = r9.this$0.postSegmentConsumedMsg();
         */
        /* JADX WARN: Code restructure failed: missing block: B:30:0x0168, code lost:
        
            switch(org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.AnonymousClass1.$SwitchMap$org$apache$pinot$common$protocols$SegmentCompletionProtocol$ControllerResponseStatus[r0.getStatus().ordinal()]) {
                case 1: goto L31;
                case 2: goto L32;
                case 3: goto L36;
                case 4: goto L37;
                case 5: goto L38;
                case 6: goto L53;
                default: goto L68;
            };
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:0x0190, code lost:
        
            r9.this$0._segmentLogger.warn("Got not leader response");
            r9.this$0.hold();
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x01a8, code lost:
        
            r0 = r9.this$0.extractOffset(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x01c1, code lost:
        
            if (r0.compareTo(r9.this$0._currentOffset) > 0) goto L35;
         */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x01c4, code lost:
        
            r9.this$0._segmentLogger.error("Invalid catchup offset {} in controller response, current offset {}", r0, r9.this$0._currentOffset);
            r9.this$0.hold();
         */
        /* JADX WARN: Code restructure failed: missing block: B:38:0x01e5, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.CATCHING_UP;
            r9.this$0._finalOffset = r0;
            r12 = r9.this$0.now();
         */
        /* JADX WARN: Code restructure failed: missing block: B:40:0x0203, code lost:
        
            r9.this$0.hold();
         */
        /* JADX WARN: Code restructure failed: missing block: B:42:0x020d, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.DISCARDED;
         */
        /* JADX WARN: Code restructure failed: missing block: B:45:0x0224, code lost:
        
            if (r9.this$0._segmentCompletionMode != org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD) goto L41;
         */
        /* JADX WARN: Code restructure failed: missing block: B:46:0x0227, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.DISCARDED;
         */
        /* JADX WARN: Code restructure failed: missing block: B:48:0x0234, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.RETAINING;
            r0 = r9.this$0._realtimeTableDataManager.getSegmentLock(r9.this$0._segmentNameStr);
            r0.lockInterruptibly();
         */
        /* JADX WARN: Code restructure failed: missing block: B:4:0x0021, code lost:
        
            if (r9.this$0._isReadyToConsumeData.getAsBoolean() == false) goto L5;
         */
        /* JADX WARN: Code restructure failed: missing block: B:51:0x025f, code lost:
        
            if (r9.this$0.buildSegmentAndReplace() == false) goto L45;
         */
        /* JADX WARN: Code restructure failed: missing block: B:52:0x0262, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.RETAINED;
         */
        /* JADX WARN: Code restructure failed: missing block: B:54:0x0290, code lost:
        
            r0.unlock();
         */
        /* JADX WARN: Code restructure failed: missing block: B:57:0x026f, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.ERROR;
            r9.this$0._segmentLogger.error("Could not build segment for {}", r9.this$0._segmentNameStr);
         */
        /* JADX WARN: Code restructure failed: missing block: B:59:0x0298, code lost:
        
            r20 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:5:0x0024, code lost:
        
            java.lang.Thread.sleep(org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
         */
        /* JADX WARN: Code restructure failed: missing block: B:62:0x02a3, code lost:
        
            throw r20;
         */
        /* JADX WARN: Code restructure failed: missing block: B:64:0x02a7, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.COMMITTING;
            r9.this$0._currentOffset = r9.this$0._partitionGroupConsumer.checkpoint(r9.this$0._currentOffset);
            r0 = r9.this$0._realtimeTableDataManager.getSegmentLock(r9.this$0._segmentNameStr);
            r0.lockInterruptibly();
         */
        /* JADX WARN: Code restructure failed: missing block: B:66:0x02e5, code lost:
        
            r9.this$0.buildSegmentForCommit(r0.getBuildTimeSeconds() * 1000);
         */
        /* JADX WARN: Code restructure failed: missing block: B:67:0x0300, code lost:
        
            if (r9.this$0._segmentBuildDescriptor != null) goto L58;
         */
        /* JADX WARN: Code restructure failed: missing block: B:68:0x0303, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.ERROR;
            r9.this$0._segmentLogger.error("Could not build segment for {}", r9.this$0._segmentNameStr);
         */
        /* JADX WARN: Code restructure failed: missing block: B:69:0x0324, code lost:
        
            r0.unlock();
         */
        /* JADX WARN: Code restructure failed: missing block: B:6:0x0031, code lost:
        
            if (r9.this$0._shouldStop != false) goto L90;
         */
        /* JADX WARN: Code restructure failed: missing block: B:72:0x0338, code lost:
        
            if (r9.this$0.commitSegment(r0.getControllerVipUrl()) == false) goto L62;
         */
        /* JADX WARN: Code restructure failed: missing block: B:73:0x033b, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.COMMITTED;
         */
        /* JADX WARN: Code restructure failed: missing block: B:74:0x0347, code lost:
        
            r0.unlock();
         */
        /* JADX WARN: Code restructure failed: missing block: B:76:0x034f, code lost:
        
            r0.unlock();
         */
        /* JADX WARN: Code restructure failed: missing block: B:77:0x0365, code lost:
        
            r9.this$0._state = org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.HOLDING;
            r9.this$0._segmentLogger.info("Could not commit segment. Retrying after hold");
            r9.this$0.hold();
         */
        /* JADX WARN: Code restructure failed: missing block: B:80:0x0359, code lost:
        
            r22 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:83:0x0364, code lost:
        
            throw r22;
         */
        /* JADX WARN: Code restructure failed: missing block: B:84:0x0387, code lost:
        
            r9.this$0._segmentLogger.error("Holding after response from Controller: {}", r0.toJsonString());
            r9.this$0.hold();
         */
        /* JADX WARN: Code restructure failed: missing block: B:87:0x0118, code lost:
        
            if (r9.this$0._state != org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.State.CATCHING_UP) goto L29;
         */
        /* JADX WARN: Code restructure failed: missing block: B:88:0x011b, code lost:
        
            r14 = r14 + (r9.this$0.now() - r12);
            r9.this$0._serverMetrics.setValueOfTableGauge(r9.this$0._clientId, org.apache.pinot.common.metrics.ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS, java.util.concurrent.TimeUnit.MILLISECONDS.toSeconds(r14));
         */
        /* JADX WARN: Code restructure failed: missing block: B:8:0x0040, code lost:
        
            if (r9.this$0._isReadyToConsumeData.getAsBoolean() == false) goto L91;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 1151
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.PartitionConsumer.run():void");
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager$SegmentBuildDescriptor.class */
    public class SegmentBuildDescriptor {
        final File _segmentTarFile;
        final Map<String, File> _metadataFileMap;
        final StreamPartitionMsgOffset _offset;
        final long _waitTimeMillis;
        final long _buildTimeMillis;
        final long _segmentSizeBytes;

        public SegmentBuildDescriptor(@Nullable File file, @Nullable Map<String, File> map, StreamPartitionMsgOffset streamPartitionMsgOffset, long j, long j2, long j3) {
            this._segmentTarFile = file;
            this._metadataFileMap = map;
            this._offset = RealtimeSegmentDataManager.this._streamPartitionMsgOffsetFactory.create(streamPartitionMsgOffset);
            this._buildTimeMillis = j;
            this._waitTimeMillis = j2;
            this._segmentSizeBytes = j3;
        }

        public StreamPartitionMsgOffset getOffset() {
            return this._offset;
        }

        public long getBuildTimeMillis() {
            return this._buildTimeMillis;
        }

        public long getWaitTimeMillis() {
            return this._waitTimeMillis;
        }

        @Nullable
        public File getSegmentTarFile() {
            return this._segmentTarFile;
        }

        @Nullable
        public Map<String, File> getMetadataFiles() {
            return this._metadataFileMap;
        }

        public long getSegmentSizeBytes() {
            return this._segmentSizeBytes;
        }

        public void deleteSegmentFile() {
            if (this._segmentTarFile != null) {
                FileUtils.deleteQuietly(this._segmentTarFile);
            }
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager$State.class */
    public enum State {
        INITIAL_CONSUMING,
        CATCHING_UP,
        HOLDING,
        CONSUMING_TO_ONLINE,
        RETAINING,
        COMMITTING,
        DISCARDED,
        RETAINED,
        COMMITTED,
        ERROR;

        public boolean shouldConsume() {
            return equals(INITIAL_CONSUMING) || equals(CATCHING_UP) || equals(CONSUMING_TO_ONLINE);
        }

        public boolean isFinal() {
            return equals(ERROR) || equals(COMMITTED) || equals(RETAINED) || equals(DISCARDED);
        }
    }

    private boolean endCriteriaReached() {
        Preconditions.checkState(this._state.shouldConsume(), "Incorrect state %s", this._state);
        long now = now();
        switch (this._state) {
            case INITIAL_CONSUMING:
                if (now >= this._consumeEndTime) {
                    if (this._hasMessagesFetched) {
                        this._segmentLogger.info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}", Long.valueOf(this._startTimeMs), Long.valueOf(now), Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed));
                        this._stopReason = SegmentCompletionProtocol.REASON_TIME_LIMIT;
                        return true;
                    }
                    this._segmentLogger.info("No events came in, extending time by {} hours", (Object) 1L);
                    this._consumeEndTime += TimeUnit.HOURS.toMillis(1L);
                    return false;
                }
                if (this._numRowsIndexed >= this._segmentMaxRowCount) {
                    this._segmentLogger.info("Stopping consumption due to row limit nRows={} numRowsIndexed={}, numRowsConsumed={}", Integer.valueOf(this._segmentMaxRowCount), Integer.valueOf(this._numRowsIndexed), Integer.valueOf(this._numRowsConsumed));
                    this._stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
                    return true;
                }
                if (this._endOfPartitionGroup) {
                    this._segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}", Integer.valueOf(this._segmentMaxRowCount), Integer.valueOf(this._numRowsIndexed), Integer.valueOf(this._numRowsConsumed));
                    this._stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
                    return true;
                }
                if (!this._forceCommitMessageReceived) {
                    return false;
                }
                this._segmentLogger.info("Stopping consumption due to force commit - numRowsConsumed={} numRowsIndexed={}", Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed));
                this._stopReason = SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
                return true;
            case CATCHING_UP:
                this._stopReason = null;
                if (this._currentOffset.compareTo(this._finalOffset) == 0) {
                    this._segmentLogger.info("Caught up to offset={}, state={}", this._finalOffset, this._state.toString());
                    return true;
                }
                if (this._currentOffset.compareTo(this._finalOffset) <= 0) {
                    return false;
                }
                this._segmentLogger.error("Offset higher in state={}, current={}, final={}", this._state.toString(), this._currentOffset, this._finalOffset);
                throw new RuntimeException("Past max offset");
            case CONSUMING_TO_ONLINE:
                if (this._currentOffset.compareTo(this._finalOffset) == 0) {
                    this._segmentLogger.info("Caught up to offset={}, state={}", this._finalOffset, this._state.toString());
                    return true;
                }
                if (now >= this._consumeEndTime) {
                    this._segmentLogger.info("Past max time budget: offset={}, state={}", this._currentOffset, this._state.toString());
                    return true;
                }
                if (this._currentOffset.compareTo(this._finalOffset) <= 0) {
                    return false;
                }
                this._segmentLogger.error("Offset higher in state={}, current={}, final={}", this._state.toString(), this._currentOffset, this._finalOffset);
                throw new RuntimeException("Past max offset");
            default:
                this._segmentLogger.error("Illegal state: {}", this._state);
                throw new RuntimeException("Illegal state to consume");
        }
    }

    private void handleTransientStreamErrors(Exception exc) throws Exception {
        this._consecutiveErrorCount++;
        this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
        this._serverMetrics.addMeteredTableValue(this._tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
        if (this._consecutiveErrorCount > 5) {
            this._segmentLogger.warn("Stream transient exception when fetching messages, stopping consumption after {} attempts", Integer.valueOf(this._consecutiveErrorCount), exc);
            throw exc;
        }
        this._segmentLogger.warn("Stream transient exception when fetching messages, retrying (count={})", Integer.valueOf(this._consecutiveErrorCount), exc);
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        recreateStreamConsumer("Too many transient errors");
    }

    protected boolean consumeLoop() throws Exception {
        boolean processStreamEvents;
        removeSegmentFile();
        this._numRowsErrored = 0;
        long idleTimeoutMillis = this._streamConfig.getIdleTimeoutMillis();
        this._idleTimer.init();
        StreamPartitionMsgOffset create = this._streamPartitionMsgOffsetFactory.create(this._currentOffset);
        this._segmentLogger.info("Starting consumption loop start offset {}, finalOffset {}", this._currentOffset, this._finalOffset);
        while (!this._shouldStop && !endCriteriaReached()) {
            this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 1L);
            try {
                MessageBatch fetchMessages = this._partitionGroupConsumer.fetchMessages(this._currentOffset, this._streamConfig.getFetchTimeoutMillis());
                this._serverMetrics.addMeteredTableValue(this._clientId, ServerMeter.REALTIME_ROWS_FETCHED, fetchMessages.getUnfilteredMessageCount());
                if (this._segmentLogger.isDebugEnabled()) {
                    this._segmentLogger.debug("message batch received. filtered={} unfiltered={} endOfPartitionGroup={}", Integer.valueOf(fetchMessages.getMessageCount()), Integer.valueOf(fetchMessages.getUnfilteredMessageCount()), Boolean.valueOf(fetchMessages.isEndOfPartitionGroup()));
                }
                this._endOfPartitionGroup = fetchMessages.isEndOfPartitionGroup();
                this._consecutiveErrorCount = 0;
                reportDataLoss(fetchMessages);
                processStreamEvents = processStreamEvents(fetchMessages, 100L);
                if (this._currentOffset.compareTo(create) != 0) {
                    this._idleTimer.markEventConsumed();
                    if (this._currentOffset instanceof LongMsgOffset) {
                        this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, ((LongMsgOffset) this._currentOffset).getOffset());
                    }
                    create = this._streamPartitionMsgOffsetFactory.create(this._currentOffset);
                } else if (processStreamEvents) {
                    if (this._segmentLogger.isDebugEnabled()) {
                        this._segmentLogger.debug("No messages processed before end criteria was reached. Staying at offset {}", this._currentOffset);
                    }
                } else if (fetchMessages.getUnfilteredMessageCount() > 0) {
                    this._idleTimer.markEventConsumed();
                    StreamPartitionMsgOffset offsetOfNextBatch = fetchMessages.getOffsetOfNextBatch();
                    if (this._segmentLogger.isDebugEnabled()) {
                        this._segmentLogger.debug("Skipped empty batch. Advancing from {} to {}", this._currentOffset, offsetOfNextBatch);
                    }
                    this._currentOffset = offsetOfNextBatch;
                    create = this._streamPartitionMsgOffsetFactory.create(offsetOfNextBatch);
                } else {
                    long timeSinceStreamLastCreatedOrConsumedMs = this._idleTimer.getTimeSinceStreamLastCreatedOrConsumedMs();
                    if (idleTimeoutMillis >= 0 && timeSinceStreamLastCreatedOrConsumedMs > idleTimeoutMillis) {
                        recreateStreamConsumer(String.format("Total idle time: %d ms exceeded idle timeout: %d ms", Long.valueOf(timeSinceStreamLastCreatedOrConsumedMs), Long.valueOf(idleTimeoutMillis)));
                        this._idleTimer.markStreamCreated();
                    }
                }
            } catch (PermanentConsumerException e) {
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
                this._serverMetrics.addMeteredTableValue(this._tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
                this._segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", (Throwable) e);
                throw e;
            } catch (Exception e2) {
                handleTransientStreamErrors(e2);
            } catch (Throwable th) {
                this._segmentLogger.warn("Stream error when fetching messages, stopping consumption", th);
                throw th;
            }
            if (processStreamEvents) {
                break;
            }
        }
        if (this._numRowsErrored <= 0) {
            return true;
        }
        this._serverMetrics.addMeteredTableValue(this._clientId, ServerMeter.ROWS_WITH_ERRORS, this._numRowsErrored);
        this._serverMetrics.addMeteredTableValue(this._tableStreamName, ServerMeter.ROWS_WITH_ERRORS, this._numRowsErrored);
        return true;
    }

    private boolean processStreamEvents(MessageBatch messageBatch, long j) {
        int messageCount = messageBatch.getMessageCount();
        this._partitionRateLimiter.throttle(messageCount);
        this._serverRateLimiter.throttle(messageCount);
        PinotMeter pinotMeter = null;
        PinotMeter pinotMeter2 = null;
        PinotMeter pinotMeter3 = null;
        PinotMeter pinotMeter4 = null;
        int i = 0;
        int i2 = 0;
        boolean z = true;
        TransformPipeline.Result result = new TransformPipeline.Result();
        boolean z2 = false;
        int i3 = 0;
        while (true) {
            if (i3 >= messageCount) {
                break;
            }
            z2 = this._shouldStop || endCriteriaReached();
            if (!z2) {
                if (!z) {
                    this._segmentLogger.error("Buffer full with {} rows consumed (row limit {}, indexed {})", Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed), Integer.valueOf(this._segmentMaxRowCount));
                    throw new RuntimeException("Realtime segment full");
                }
                StreamMessage streamMessage = messageBatch.getStreamMessage(i3);
                StreamDataDecoderResult decode = this._streamDataDecoder.decode(streamMessage);
                StreamMessageMetadata metadata = streamMessage.getMetadata();
                StreamPartitionMsgOffset streamPartitionMsgOffset = null;
                StreamPartitionMsgOffset streamPartitionMsgOffset2 = null;
                if (metadata != null) {
                    streamPartitionMsgOffset = metadata.getOffset();
                    streamPartitionMsgOffset2 = metadata.getNextOffset();
                }
                if (streamPartitionMsgOffset2 == null) {
                    streamPartitionMsgOffset2 = messageBatch.getNextStreamPartitionMsgOffsetAtIndex(i3);
                }
                if (decode.getException() != null) {
                    pinotMeter2 = this._serverMetrics.addMeteredTableValue(this._clientId, (String) ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1L, pinotMeter2);
                    this._numRowsErrored++;
                } else {
                    try {
                        this._recordEnricherPipeline.run(decode.getResult());
                        this._transformPipeline.processRow(decode.getResult(), result);
                    } catch (Exception e) {
                        this._numRowsErrored++;
                        result.getTransformedRows().clear();
                        String format = String.format("Caught exception while transforming the record at offset: %s , row: %s", streamPartitionMsgOffset, decode.getResult());
                        this._segmentLogger.error(format, (Throwable) e);
                        this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format, e));
                    }
                    if (result.getSkippedRowCount() > 0) {
                        pinotMeter2 = this._serverMetrics.addMeteredTableValue(this._clientId, (String) ServerMeter.REALTIME_ROWS_FILTERED, result.getSkippedRowCount(), pinotMeter2);
                        if (this._trackFilteredMessageOffsets) {
                            this._filteredMessageOffsets.add(streamPartitionMsgOffset.toString());
                        }
                    }
                    if (result.getIncompleteRowCount() > 0) {
                        pinotMeter3 = this._serverMetrics.addMeteredTableValue(this._clientId, (String) ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED, result.getIncompleteRowCount(), pinotMeter3);
                    }
                    if (result.getSanitizedRowCount() > 0) {
                        pinotMeter4 = this._serverMetrics.addMeteredTableValue(this._clientId, (String) ServerMeter.REALTIME_ROWS_SANITIZED, result.getSanitizedRowCount(), pinotMeter4);
                    }
                    for (GenericRow genericRow : result.getTransformedRows()) {
                        try {
                            z = this._realtimeSegment.index(genericRow, metadata);
                            i++;
                            this._lastRowMetadata = metadata;
                            this._lastConsumedTimestampMs = System.currentTimeMillis();
                            pinotMeter = this._serverMetrics.addMeteredTableValue(this._clientId, (String) ServerMeter.REALTIME_ROWS_CONSUMED, 1L, pinotMeter);
                            this._serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
                        } catch (Exception e2) {
                            this._numRowsErrored++;
                            String format2 = String.format("Caught exception while indexing the record at offset: %s , row: %s", streamPartitionMsgOffset, genericRow);
                            this._segmentLogger.error(format2, (Throwable) e2);
                            this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format2, e2));
                        }
                    }
                }
                this._currentOffset = streamPartitionMsgOffset2;
                this._numRowsIndexed = this._realtimeSegment.getNumDocsIndexed();
                this._numRowsConsumed++;
                i2++;
                i3++;
            } else if (this._segmentLogger.isDebugEnabled()) {
                this._segmentLogger.debug("stop processing message batch early shouldStop: {}", Boolean.valueOf(this._shouldStop));
            }
        }
        updateCurrentDocumentCountMetrics();
        if (messageBatch.getUnfilteredMessageCount() > 0) {
            updateIngestionMetrics(messageBatch.getLastMessageMetadata());
            this._hasMessagesFetched = true;
            if (i2 > 0 && this._segmentLogger.isDebugEnabled()) {
                this._segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", Integer.valueOf(i), Integer.valueOf(i2), this._currentOffset);
            }
        } else if (!z2) {
            setIngestionDelayToZero();
            if (this._segmentLogger.isDebugEnabled()) {
                this._segmentLogger.debug("empty batch received - sleeping for {}ms", Long.valueOf(j));
            }
            Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
        }
        return z2;
    }

    @VisibleForTesting
    protected StreamPartitionMsgOffset extractOffset(SegmentCompletionProtocol.Response response) {
        return this._streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
    }

    protected void buildSegmentForCommit(long j) {
        File segmentTarFile;
        try {
            if (this._segmentBuildDescriptor == null || this._segmentBuildDescriptor.getOffset().compareTo(this._currentOffset) != 0 || (segmentTarFile = this._segmentBuildDescriptor.getSegmentTarFile()) == null || !segmentTarFile.exists()) {
                removeSegmentFile();
                if (j <= 0) {
                    j = this._segBuildSemaphore == null ? SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds() * 1000 : 30000L;
                }
                this._leaseExtender.addSegment(this._segmentNameStr, j, this._currentOffset);
                this._segmentBuildDescriptor = buildSegmentInternal(true);
                this._leaseExtender.removeSegment(this._segmentNameStr);
            }
        } finally {
            this._leaseExtender.removeSegment(this._segmentNameStr);
        }
    }

    public Map<String, String> getPartitionToCurrentOffset() {
        return Collections.singletonMap(String.valueOf(this._partitionGroupId), this._currentOffset.toString());
    }

    public CommonConstants.ConsumerState getConsumerState() {
        return this._state == State.ERROR ? CommonConstants.ConsumerState.NOT_CONSUMING : CommonConstants.ConsumerState.CONSUMING;
    }

    public long getLastConsumedTimestamp() {
        return this._lastConsumedTimestampMs;
    }

    public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
        String valueOf = String.valueOf(this._partitionGroupId);
        return Collections.singletonMap(valueOf, new ConsumerPartitionState(valueOf, getCurrentOffset(), getLastConsumedTimestamp(), fetchLatestStreamOffset(5000L), this._lastRowMetadata));
    }

    public Map<String, PartitionLagState> getPartitionToLagState(Map<String, ConsumerPartitionState> map) {
        if (this._partitionMetadataProvider == null) {
            createPartitionMetadataProvider("Get Partition Lag State");
        }
        return this._partitionMetadataProvider.getCurrentPartitionLagState(map);
    }

    private void reportDataLoss(MessageBatch messageBatch) {
        if (messageBatch.hasDataLoss()) {
            this._serverMetrics.addMeteredTableValue(this._tableStreamName, ServerMeter.STREAM_DATA_LOSS, 1L);
            String format = String.format("Message loss detected in stream partition: %s for table: %s startOffset: %s batchFirstOffset: %s", Integer.valueOf(this._partitionGroupId), this._tableNameWithType, this._startOffset, messageBatch.getFirstMessageOffset());
            this._segmentLogger.error(format);
            this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format, (Exception) null));
        }
    }

    public StreamPartitionMsgOffset getCurrentOffset() {
        return this._currentOffset;
    }

    public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() {
        return this._latestStreamOffsetAtStartupTime;
    }

    @VisibleForTesting
    SegmentBuildDescriptor getSegmentBuildDescriptor() {
        return this._segmentBuildDescriptor;
    }

    @VisibleForTesting
    Semaphore getPartitionGroupConsumerSemaphore() {
        return this._partitionGroupConsumerSemaphore;
    }

    @VisibleForTesting
    AtomicBoolean getAcquiredConsumerSemaphore() {
        return this._acquiredConsumerSemaphore;
    }

    @VisibleForTesting
    SegmentBuildDescriptor buildSegmentInternal(boolean z) {
        if (this._allowConsumptionDuringCommit) {
            closeStreamConsumers();
        }
        try {
            if (this._realtimeTableDataManager.isShutDown()) {
                this._segmentLogger.warn("Table data manager is already shut down");
                return null;
            }
            try {
                long now = now();
                if (this._segBuildSemaphore != null) {
                    this._segmentLogger.info("Waiting to acquire semaphore for building segment");
                    this._segBuildSemaphore.acquire();
                }
                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, 1L);
                long now2 = now();
                File file = new File(this._resourceTmpDir, "tmp-" + this._segmentNameStr + "-" + now());
                SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
                segmentZKPropsConfig.setStartOffset(this._segmentZKMetadata.getStartOffset());
                segmentZKPropsConfig.setEndOffset(this._currentOffset.toString());
                RealtimeSegmentConverter realtimeSegmentConverter = new RealtimeSegmentConverter(this._realtimeSegment, segmentZKPropsConfig, file.getAbsolutePath(), this._schema, this._tableNameWithType, this._tableConfig, this._segmentZKMetadata.getSegmentName(), this._columnIndicesForRealtimeTable, this._nullHandlingEnabled);
                this._segmentLogger.info("Trying to build segment");
                try {
                    realtimeSegmentConverter.build(this._segmentVersion, this._serverMetrics);
                    long now3 = now() - now2;
                    long j = now2 - now;
                    this._segmentLogger.info("Successfully built segment (Column Mode: {}) in {} ms, after lockWaitTime {} ms", Boolean.valueOf(realtimeSegmentConverter.isColumnMajorEnabled()), Long.valueOf(now3), Long.valueOf(j));
                    File file2 = new File(this._resourceDataDir);
                    File file3 = new File(file2, this._segmentNameStr);
                    FileUtils.deleteQuietly(file3);
                    File[] listFiles = file.listFiles();
                    if (!$assertionsDisabled && listFiles == null) {
                        throw new AssertionError();
                    }
                    File file4 = listFiles[0];
                    try {
                        try {
                            FileUtils.moveDirectory(file4, file3);
                            FileUtils.deleteQuietly(file);
                            long sizeOfDirectory = FileUtils.sizeOfDirectory(file3);
                            this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS, TimeUnit.MILLISECONDS.toSeconds(now3));
                            this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS, TimeUnit.MILLISECONDS.toSeconds(j));
                            if (!z) {
                                SegmentBuildDescriptor segmentBuildDescriptor = new SegmentBuildDescriptor(null, null, this._currentOffset, now3, j, sizeOfDirectory);
                                if (this._segBuildSemaphore != null) {
                                    this._segBuildSemaphore.release();
                                }
                                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                                return segmentBuildDescriptor;
                            }
                            File file5 = new File(file2, this._segmentNameStr + ".tar.gz");
                            try {
                                TarGzCompressionUtils.createTarGzFile(file3, file5);
                                File findMetadataFile = SegmentDirectoryPaths.findMetadataFile(file3);
                                if (findMetadataFile == null) {
                                    String format = String.format("Failed to find file: %s under index directory: %s", V1Constants.MetadataKeys.METADATA_FILE_NAME, file3);
                                    this._segmentLogger.error(format);
                                    this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format, (Exception) null));
                                    if (this._segBuildSemaphore != null) {
                                        this._segBuildSemaphore.release();
                                    }
                                    this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                                    return null;
                                }
                                File findCreationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(file3);
                                if (findCreationMetaFile == null) {
                                    String format2 = String.format("Failed to find file: %s under index directory: %s", V1Constants.SEGMENT_CREATION_META, file3);
                                    this._segmentLogger.error(format2);
                                    this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format2, (Exception) null));
                                    if (this._segBuildSemaphore != null) {
                                        this._segBuildSemaphore.release();
                                    }
                                    this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                                    return null;
                                }
                                HashMap hashMap = new HashMap();
                                hashMap.put(V1Constants.MetadataKeys.METADATA_FILE_NAME, findMetadataFile);
                                hashMap.put(V1Constants.SEGMENT_CREATION_META, findCreationMetaFile);
                                SegmentBuildDescriptor segmentBuildDescriptor2 = new SegmentBuildDescriptor(file5, hashMap, this._currentOffset, now3, j, sizeOfDirectory);
                                if (this._segBuildSemaphore != null) {
                                    this._segBuildSemaphore.release();
                                }
                                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                                return segmentBuildDescriptor2;
                            } catch (IOException e) {
                                String format3 = String.format("Caught exception while taring index directory from: %s to: %s", file3, file5);
                                this._segmentLogger.error(format3, (Throwable) e);
                                this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format3, e));
                                if (this._segBuildSemaphore != null) {
                                    this._segBuildSemaphore.release();
                                }
                                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                                return null;
                            }
                        } catch (IOException e2) {
                            String format4 = String.format("Caught exception while moving index directory from: %s to: %s", file4, file3);
                            this._segmentLogger.error(format4, (Throwable) e2);
                            this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), format4, e2));
                            FileUtils.deleteQuietly(file);
                            if (this._segBuildSemaphore != null) {
                                this._segBuildSemaphore.release();
                            }
                            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                            return null;
                        }
                    } catch (Throwable th) {
                        FileUtils.deleteQuietly(file);
                        throw th;
                    }
                } catch (Exception e3) {
                    this._segmentLogger.error("Could not build segment", (Throwable) e3);
                    FileUtils.deleteQuietly(file);
                    this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), "Could not build segment", e3));
                    if (this._segBuildSemaphore != null) {
                        this._segBuildSemaphore.release();
                    }
                    this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                    return null;
                }
            } catch (InterruptedException e4) {
                this._segmentLogger.error("Interrupted while waiting for semaphore", (Throwable) e4);
                this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), "Interrupted while waiting for semaphore", e4));
                if (this._segBuildSemaphore != null) {
                    this._segBuildSemaphore.release();
                }
                this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
                return null;
            }
        } catch (Throwable th2) {
            if (this._segBuildSemaphore != null) {
                this._segBuildSemaphore.release();
            }
            this._serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, -1L);
            throw th2;
        }
    }

    @VisibleForTesting
    boolean commitSegment(String str) throws Exception {
        File segmentTarFile = this._segmentBuildDescriptor.getSegmentTarFile();
        Preconditions.checkState(segmentTarFile != null && segmentTarFile.exists(), "Segment tar file: %s does not exist", segmentTarFile);
        SegmentCompletionProtocol.Response commit = commit(str);
        if (commit.getStatus() != SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS) {
            this._segmentLogger.warn("Controller response was {} and not {}", commit.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
            return false;
        }
        this._realtimeTableDataManager.replaceConsumingSegment(this._segmentNameStr);
        removeSegmentFile();
        return true;
    }

    @VisibleForTesting
    SegmentCompletionProtocol.Response commit(String str) {
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withSegmentName(this._segmentNameStr).withStreamPartitionMsgOffset(this._currentOffset.toString()).withNumRows(this._numRowsConsumed).withInstanceId(this._instanceId).withReason(this._stopReason).withBuildTimeMillis(this._segmentBuildDescriptor.getBuildTimeMillis()).withSegmentSizeBytes(this._segmentBuildDescriptor.getSegmentSizeBytes()).withWaitTimeMillis(this._segmentBuildDescriptor.getWaitTimeMillis());
        if (this._isOffHeap) {
            params.withMemoryUsedBytes(this._memoryManager.getTotalAllocatedBytes());
        }
        try {
            return this._segmentCommitterFactory.createSegmentCommitter(params, str).commit(this._segmentBuildDescriptor);
        } catch (URISyntaxException e) {
            this._segmentLogger.error("Failed to create a segment committer: ", (Throwable) e);
            return SegmentCompletionProtocol.RESP_NOT_SENT;
        }
    }

    protected boolean buildSegmentAndReplace() throws Exception {
        if (buildSegmentInternal(false) == null) {
            return false;
        }
        this._realtimeTableDataManager.replaceConsumingSegment(this._segmentNameStr);
        return true;
    }

    private void closeStreamConsumers() {
        closePartitionGroupConsumer();
        closePartitionMetadataProvider();
        if (this._acquiredConsumerSemaphore.compareAndSet(true, false)) {
            this._partitionGroupConsumerSemaphore.release();
        }
    }

    private void closePartitionGroupConsumer() {
        try {
            this._partitionGroupConsumer.close();
        } catch (Exception e) {
            this._segmentLogger.warn("Could not close stream consumer", (Throwable) e);
        }
    }

    private void closePartitionMetadataProvider() {
        if (this._partitionMetadataProvider != null) {
            try {
                this._partitionMetadataProvider.close();
            } catch (Exception e) {
                this._segmentLogger.warn("Could not close stream metadata provider", (Throwable) e);
            }
        }
    }

    private void cleanupMetrics() {
        this._serverMetrics.removeTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING);
    }

    protected void hold() throws InterruptedException {
        Thread.sleep(3000L);
    }

    protected void postStopConsumedMsg(String str) {
        ConsumptionStopIndicator consumptionStopIndicator = new ConsumptionStopIndicator(this._currentOffset, this._segmentNameStr, this._instanceId, this._protocolHandler, str, this._segmentLogger);
        do {
            SegmentCompletionProtocol.Response postSegmentStoppedConsuming = consumptionStopIndicator.postSegmentStoppedConsuming();
            if (postSegmentStoppedConsuming.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
                return;
            }
            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.SECONDS);
            this._segmentLogger.info("Retrying after response {}", postSegmentStoppedConsuming.toJsonString());
        } while (!this._shouldStop);
    }

    protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withStreamPartitionMsgOffset(this._currentOffset.toString()).withSegmentName(this._segmentNameStr).withReason(this._stopReason).withNumRows(this._numRowsConsumed).withInstanceId(this._instanceId);
        if (this._isOffHeap) {
            params.withMemoryUsedBytes(this._memoryManager.getTotalAllocatedBytes());
        }
        return this._protocolHandler.segmentConsumed(params);
    }

    private void removeSegmentFile() {
        if (this._segmentBuildDescriptor != null) {
            this._segmentBuildDescriptor.deleteSegmentFile();
            this._segmentBuildDescriptor = null;
        }
    }

    public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata) throws InterruptedException {
        this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
        try {
            try {
                removeSegmentFile();
                this._leaseExtender.removeSegment(this._segmentNameStr);
                StreamPartitionMsgOffset create = this._streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
                this._segmentLogger.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", this._state, this._startOffset, create);
                stop();
                this._segmentLogger.info("Consumer thread stopped in state {}", this._state);
                switch (this._state) {
                    case INITIAL_CONSUMING:
                    case CATCHING_UP:
                    case HOLDING:
                        switch (this._segmentCompletionMode) {
                            case DOWNLOAD:
                                this._segmentLogger.info("State {}. CompletionMode {}. Downloading to replace", this._state.toString(), this._segmentCompletionMode);
                                downloadSegmentAndReplace(segmentZKMetadata);
                                break;
                            case DEFAULT:
                                if (this._currentOffset.compareTo(create) <= 0) {
                                    if (this._currentOffset.compareTo(create) != 0) {
                                        this._segmentLogger.info("Attempting to catch up from offset {} to {} ", this._currentOffset, create);
                                        if (catchupToFinalOffset(create, TimeUnit.MILLISECONDS.convert(31L, TimeUnit.SECONDS))) {
                                            this._segmentLogger.info("Caught up to offset {}", this._currentOffset);
                                            buildSegmentAndReplace();
                                        } else {
                                            this._segmentLogger.info("Could not catch up to offset (current = {}). Downloading to replace", this._currentOffset);
                                            downloadSegmentAndReplace(segmentZKMetadata);
                                        }
                                        break;
                                    } else {
                                        this._segmentLogger.info("Current offset {} matches offset in zk {}. Replacing segment", this._currentOffset, create);
                                        buildSegmentAndReplace();
                                        break;
                                    }
                                } else {
                                    this._segmentLogger.warn("Current offset {} ahead of the offset in zk {}. Downloading to replace", this._currentOffset, create);
                                    downloadSegmentAndReplace(segmentZKMetadata);
                                    break;
                                }
                        }
                    case CONSUMING_TO_ONLINE:
                    default:
                        this._segmentLogger.info("Downloading to replace segment while in state {}", this._state.toString());
                        downloadSegmentAndReplace(segmentZKMetadata);
                        break;
                    case COMMITTED:
                    case RETAINED:
                        this._segmentLogger.info("State {}. Nothing to do", this._state.toString());
                        break;
                    case DISCARDED:
                    case ERROR:
                        this._segmentLogger.info("State {}. Downloading to replace", this._state.toString());
                        downloadSegmentAndReplace(segmentZKMetadata);
                        break;
                }
                this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            } catch (Exception e) {
                Utils.rethrowException(e);
                this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            }
        } catch (Throwable th) {
            this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            throw th;
        }
    }

    protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) throws Exception {
        if (this._allowConsumptionDuringCommit) {
            closeStreamConsumers();
        }
        this._realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata);
    }

    protected long now() {
        return System.currentTimeMillis();
    }

    private boolean catchupToFinalOffset(StreamPartitionMsgOffset streamPartitionMsgOffset, long j) {
        this._finalOffset = streamPartitionMsgOffset;
        this._consumeEndTime = now() + j;
        this._state = State.CONSUMING_TO_ONLINE;
        this._shouldStop = false;
        try {
            try {
                consumeLoop();
                this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
                if (this._currentOffset.compareTo(streamPartitionMsgOffset) == 0) {
                    return true;
                }
                this._segmentLogger.warn("Could not consume up to {} (current offset {})", streamPartitionMsgOffset, this._currentOffset);
                return false;
            } catch (Exception e) {
                this._segmentLogger.warn("Exception when catching up to final offset", (Throwable) e);
                this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
                return false;
            }
        } catch (Throwable th) {
            this._serverMetrics.setValueOfTableGauge(this._clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0L);
            throw th;
        }
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public void doOffload() {
        try {
            stop();
        } catch (Exception e) {
            this._segmentLogger.error("Caught exception while stopping the consumer thread", (Throwable) e);
        }
        closeStreamConsumers();
        cleanupMetrics();
        this._realtimeSegment.offload();
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    protected void doDestroy() {
        this._realtimeSegment.destroy();
    }

    public void startConsumption() {
        this._consumerThread = new Thread(new PartitionConsumer(), this._segmentNameStr);
        this._segmentLogger.info("Created new consumer thread {} for {}", this._consumerThread, this);
        this._consumerThread.start();
    }

    public void stop() throws InterruptedException {
        this._shouldStop = true;
        if (Thread.currentThread() == this._consumerThread || !this._consumerThread.isAlive()) {
            return;
        }
        this._consumerThread.interrupt();
        this._consumerThread.join();
    }

    public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String str, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName lLCSegmentName, Semaphore semaphore, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager, @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager, BooleanSupplier booleanSupplier) throws AttemptsExceededException, RetriableOperationException {
        String str2;
        this._trackFilteredMessageOffsets = false;
        this._segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
        this._segmentZKMetadata = segmentZKMetadata;
        this._tableConfig = tableConfig;
        this._tableNameWithType = this._tableConfig.getTableName();
        this._realtimeTableDataManager = realtimeTableDataManager;
        this._resourceDataDir = str;
        this._indexLoadingConfig = indexLoadingConfig;
        this._schema = schema;
        this._serverMetrics = serverMetrics;
        this._partitionUpsertMetadataManager = partitionUpsertMetadataManager;
        this._isReadyToConsumeData = booleanSupplier;
        this._segmentVersion = indexLoadingConfig.getSegmentVersion();
        this._instanceId = this._realtimeTableDataManager.getInstanceId();
        this._leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(this._tableNameWithType);
        this._protocolHandler = new ServerSegmentCompletionProtocolHandler(this._serverMetrics, this._tableNameWithType);
        CompletionConfig completionConfig = this._tableConfig.getValidationConfig().getCompletionConfig();
        this._segmentCompletionMode = (completionConfig == null || !CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD.toString().equalsIgnoreCase(completionConfig.getCompletionMode())) ? CommonConstants.Segment.Realtime.CompletionMode.DEFAULT : CommonConstants.Segment.Realtime.CompletionMode.DOWNLOAD;
        String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        IndexingConfig indexingConfig = this._tableConfig.getIndexingConfig();
        this._streamConfig = new StreamConfig(this._tableNameWithType, IngestionConfigUtils.getStreamConfigMap(this._tableConfig));
        this._streamConsumerFactory = StreamConsumerFactoryProvider.create(this._streamConfig);
        this._streamPartitionMsgOffsetFactory = this._streamConsumerFactory.createStreamMsgOffsetFactory();
        String topicName = this._streamConfig.getTopicName();
        this._segmentNameStr = this._segmentZKMetadata.getSegmentName();
        this._partitionGroupId = lLCSegmentName.getPartitionGroupId();
        this._partitionGroupConsumptionStatus = new PartitionGroupConsumptionStatus(this._partitionGroupId, lLCSegmentName.getSequenceNumber(), this._streamPartitionMsgOffsetFactory.create(this._segmentZKMetadata.getStartOffset()), this._segmentZKMetadata.getEndOffset() == null ? null : this._streamPartitionMsgOffsetFactory.create(this._segmentZKMetadata.getEndOffset()), this._segmentZKMetadata.getStatus().toString());
        this._partitionGroupConsumerSemaphore = semaphore;
        this._acquiredConsumerSemaphore = new AtomicBoolean(false);
        InstanceDataManagerConfig instanceDataManagerConfig = this._indexLoadingConfig.getInstanceDataManagerConfig();
        String consumerClientIdSuffix = instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
        if (StringUtils.isNotBlank(consumerClientIdSuffix)) {
            this._clientId = this._tableNameWithType + "-" + topicName + "-" + this._partitionGroupId + "-" + consumerClientIdSuffix;
        } else {
            this._clientId = this._tableNameWithType + "-" + topicName + "-" + this._partitionGroupId;
        }
        this._segmentLogger = LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" + this._segmentNameStr);
        this._tableStreamName = this._tableNameWithType + "_" + topicName;
        if (!this._indexLoadingConfig.isRealtimeOffHeapAllocation() || this._indexLoadingConfig.isDirectRealtimeOffHeapAllocation()) {
            this._memoryManager = new DirectMemoryManager(this._segmentNameStr, this._serverMetrics);
        } else {
            this._memoryManager = new MmapMemoryManager(this._realtimeTableDataManager.getConsumerDir(), this._segmentNameStr, this._serverMetrics);
        }
        this._partitionRateLimiter = RealtimeConsumptionRateManager.getInstance().createRateLimiter(this._streamConfig, this._tableNameWithType, this._serverMetrics, this._clientId);
        this._serverRateLimiter = RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
        if (tableConfig.getIngestionConfig() != null && tableConfig.getIngestionConfig().getStreamIngestionConfig() != null) {
            this._trackFilteredMessageOffsets = tableConfig.getIngestionConfig().getStreamIngestionConfig().isTrackFilteredMessageOffsets();
        }
        List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
        if (sortedColumns.isEmpty()) {
            this._segmentLogger.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}", lLCSegmentName);
            str2 = null;
        } else {
            String str3 = sortedColumns.get(0);
            if (this._schema.hasColumn(str3)) {
                this._segmentLogger.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}", str3, lLCSegmentName);
                str2 = str3;
            } else {
                this._segmentLogger.warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", str3, lLCSegmentName);
                str2 = null;
            }
        }
        if (str2 != null) {
            indexLoadingConfig.addInvertedIndexColumns(str2);
        }
        int sizeThresholdToFlushSegment = segmentZKMetadata.getSizeThresholdToFlushSegment();
        sizeThresholdToFlushSegment = sizeThresholdToFlushSegment <= 0 ? this._streamConfig.getFlushThresholdRows() : sizeThresholdToFlushSegment;
        this._segmentMaxRowCount = sizeThresholdToFlushSegment <= 0 ? 5000000 : sizeThresholdToFlushSegment;
        this._isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
        this._nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
        this._columnIndicesForRealtimeTable = new ColumnIndicesForRealtimeTable(str2, new ArrayList(indexLoadingConfig.getInvertedIndexColumns()), new ArrayList(indexLoadingConfig.getTextIndexColumns()), new ArrayList(indexLoadingConfig.getFSTIndexColumns()), new ArrayList(indexLoadingConfig.getNoDictionaryColumns()), new ArrayList(indexLoadingConfig.getVarLengthDictionaryColumns()));
        RealtimeSegmentConfig.Builder fieldConfigList = new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(this._tableNameWithType).setSegmentName(this._segmentNameStr).setStreamName(topicName).setSchema(this._schema).setTimeColumnName(timeColumnName).setCapacity(this._segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(this._isOffHeap).setMemoryManager(this._memoryManager).setStatsHistory(realtimeTableDataManager.getStatsHistory()).setAggregateMetrics(indexingConfig.isAggregateMetrics()).setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig)).setNullHandlingEnabled(this._nullHandlingEnabled).setConsumerDir(realtimeTableDataManager.getConsumerDir()).setUpsertMode(tableConfig.getUpsertMode()).setPartitionUpsertMetadataManager(partitionUpsertMetadataManager).setPartitionDedupMetadataManager(partitionDedupMetadataManager).setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns()).setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn()).setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn()).setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord()).setFieldConfigList(tableConfig.getFieldConfigList());
        Set<String> fieldsForRecordExtractor = IngestionUtils.getFieldsForRecordExtractor(this._tableConfig.getIngestionConfig(), this._schema);
        ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2000000476837158d);
        AtomicReference atomicReference = new AtomicReference();
        try {
            exponentialBackoffRetryPolicy.attempt(() -> {
                try {
                    atomicReference.set(new StreamDataDecoderImpl(createMessageDecoder(fieldsForRecordExtractor)));
                    return true;
                } catch (Exception e) {
                    this._segmentLogger.warn("Failed to initialize the StreamMessageDecoder: ", (Throwable) e);
                    return false;
                }
            });
            this._streamDataDecoder = (StreamDataDecoder) atomicReference.get();
            try {
                this._recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
                this._transformPipeline = new TransformPipeline(tableConfig, schema);
                try {
                    this._partitionGroupConsumerSemaphore.acquire();
                    this._acquiredConsumerSemaphore.set(true);
                    try {
                        this._startOffset = this._partitionGroupConsumptionStatus.getStartOffset();
                        this._currentOffset = this._streamPartitionMsgOffsetFactory.create(this._startOffset);
                        makeStreamConsumer("Starting");
                        createPartitionMetadataProvider("Starting");
                        setPartitionParameters(fieldConfigList, indexingConfig.getSegmentPartitionConfig());
                        this._realtimeSegment = new MutableSegmentImpl(fieldConfigList.build(), serverMetrics);
                        this._resourceTmpDir = new File(str, RESOURCE_TEMP_DIR_NAME);
                        if (!this._resourceTmpDir.exists()) {
                            this._resourceTmpDir.mkdirs();
                        }
                        this._state = State.INITIAL_CONSUMING;
                        this._latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000L);
                        this._consumeStartTime = now();
                        setConsumeEndTime(segmentZKMetadata, this._consumeStartTime);
                        this._segmentCommitterFactory = new SegmentCommitterFactory(this._segmentLogger, this._protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
                        this._segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", lLCSegmentName, Integer.valueOf(this._segmentMaxRowCount), new DateTime(this._consumeEndTime, DateTimeZone.UTC));
                        this._allowConsumptionDuringCommit = !this._realtimeTableDataManager.isPartialUpsertEnabled() ? true : this._tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit();
                    } catch (Exception e) {
                        this._partitionGroupConsumerSemaphore.release();
                        this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize segment data manager", e));
                        this._segmentLogger.warn("Scheduling task to call controller to mark the segment as OFFLINE in Ideal State due to initialization error: '{}'", e.getMessage());
                        new Thread(() -> {
                            ConsumptionStopIndicator consumptionStopIndicator = new ConsumptionStopIndicator(this._currentOffset, this._segmentNameStr, this._instanceId, this._protocolHandler, "Consuming segment initialization error", this._segmentLogger);
                            try {
                                Thread.sleep(30000L);
                                consumptionStopIndicator.postSegmentStoppedConsuming();
                            } catch (InterruptedException e2) {
                            }
                        }).start();
                        throw e;
                    }
                } catch (InterruptedException e2) {
                    this._segmentLogger.error("InterruptedException when acquiring the partitionConsumerSemaphore");
                    throw new RuntimeException("InterruptedException when acquiring the partitionConsumerSemaphore" + " for segment: " + this._segmentNameStr);
                }
            } catch (Exception e3) {
                this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e3));
                throw e3;
            }
        } catch (Exception e4) {
            this._realtimeTableDataManager.addSegmentError(this._segmentNameStr, new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e4));
            throw e4;
        }
    }

    private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long j) {
        long flushThresholdTimeMillis = this._streamConfig.getFlushThresholdTimeMillis();
        this._consumeEndTime = segmentZKMetadata.getCreationTime() + flushThresholdTimeMillis;
        long min = Math.min(flushThresholdTimeMillis, TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES));
        if (this._consumeEndTime - j < min) {
            this._consumeEndTime = j + min;
        }
    }

    public long getTimeSinceEventLastConsumedMs() {
        return this._idleTimer.getTimeSinceEventLastConsumedMs();
    }

    public StreamPartitionMsgOffset fetchLatestStreamOffset(long j, boolean z) {
        return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, j, z);
    }

    public StreamPartitionMsgOffset fetchLatestStreamOffset(long j) {
        return fetchLatestStreamOffset(j, false);
    }

    public StreamPartitionMsgOffset fetchEarliestStreamOffset(long j, boolean z) {
        return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, j, z);
    }

    public StreamPartitionMsgOffset fetchEarliestStreamOffset(long j) {
        return fetchEarliestStreamOffset(j, false);
    }

    private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long j, boolean z) {
        if (this._partitionMetadataProvider == null) {
            createPartitionMetadataProvider("Fetch latest stream offset");
        }
        try {
            return this._partitionMetadataProvider.fetchStreamPartitionOffset(offsetCriteria, j);
        } catch (Exception e) {
            String format = String.format("Cannot fetch stream offset with criteria %s for clientId %s and partitionGroupId %d with maxWaitTime %d", offsetCriteria, this._clientId, Integer.valueOf(this._partitionGroupId), Long.valueOf(j));
            if (z) {
                this._segmentLogger.debug(format, (Throwable) e);
                return null;
            }
            this._segmentLogger.warn(format, (Throwable) e);
            return null;
        }
    }

    private void setPartitionParameters(RealtimeSegmentConfig.Builder builder, SegmentPartitionConfig segmentPartitionConfig) {
        if (segmentPartitionConfig != null) {
            Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
            if (columnPartitionMap.size() != 1) {
                this._segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
                return;
            }
            Map.Entry<String, ColumnPartitionConfig> next = columnPartitionMap.entrySet().iterator().next();
            String key = next.getKey();
            ColumnPartitionConfig value = next.getValue();
            String functionName = value.getFunctionName();
            int numPartitions = value.getNumPartitions();
            try {
                int size = this._partitionMetadataProvider.computePartitionGroupMetadata(this._clientId, this._streamConfig, Collections.emptyList(), 5000).size();
                if (size != numPartitions) {
                    this._segmentLogger.info("Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", Integer.valueOf(size), Integer.valueOf(numPartitions));
                    numPartitions = size;
                }
            } catch (Exception e) {
                this._segmentLogger.warn("Failed to get number of stream partitions in 5s, using number of partitions in the partition config: {}", Integer.valueOf(numPartitions), e);
                createPartitionMetadataProvider("Timeout getting number of stream partitions");
            }
            builder.setPartitionColumn(key);
            builder.setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, null));
            builder.setPartitionId(this._partitionGroupId);
        }
    }

    private void makeStreamConsumer(String str) {
        if (this._partitionGroupConsumer != null) {
            closePartitionGroupConsumer();
        }
        this._segmentLogger.info("Creating new stream consumer for topic partition {} , reason: {}", this._clientId, str);
        try {
            this._partitionGroupConsumer = this._streamConsumerFactory.createPartitionGroupConsumer(this._clientId, this._partitionGroupConsumptionStatus);
            this._partitionGroupConsumer.start(this._currentOffset);
        } catch (Exception e) {
            this._segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {} reason {}", this._clientId, str, e);
            this._serverMetrics.addMeteredTableValue(this._clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
            throw e;
        }
    }

    private void recreateStreamConsumer(String str) {
        this._segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", this._clientId, str);
        this._currentOffset = this._partitionGroupConsumer.checkpoint(this._currentOffset);
        closePartitionGroupConsumer();
        try {
            this._partitionGroupConsumer = this._streamConsumerFactory.createPartitionGroupConsumer(this._clientId, this._partitionGroupConsumptionStatus);
            this._partitionGroupConsumer.start(this._currentOffset);
        } catch (Exception e) {
            this._segmentLogger.error("Faced exception while trying to recreate stream consumer for topic partition {}", this._clientId, e);
            this._serverMetrics.addMeteredTableValue(this._clientId, ServerMeter.STREAM_CONSUMER_CREATE_EXCEPTIONS, 1L);
            throw e;
        }
    }

    private void createPartitionMetadataProvider(String str) {
        closePartitionMetadataProvider();
        this._segmentLogger.info("Creating new partition metadata provider, reason: {}", str);
        this._partitionMetadataProvider = this._streamConsumerFactory.createPartitionMetadataProvider(this._clientId, this._partitionGroupId);
    }

    private void updateIngestionMetrics(RowMetadata rowMetadata) {
        if (rowMetadata != null) {
            try {
                this._realtimeTableDataManager.updateIngestionMetrics(rowMetadata.getRecordIngestionTimeMs(), rowMetadata.getFirstStreamRecordIngestionTimeMs(), rowMetadata.getOffset(), fetchLatestStreamOffset(5000L, true), this._partitionGroupId);
            } catch (Exception e) {
                this._segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", (Throwable) e);
            }
        }
    }

    private void setIngestionDelayToZero() {
        long currentTimeMillis = System.currentTimeMillis();
        this._realtimeTableDataManager.updateIngestionMetrics(currentTimeMillis, currentTimeMillis, null, null, this._partitionGroupId);
    }

    private void updateCurrentDocumentCountMetrics() {
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.DOCUMENT_COUNT, this._numRowsIndexed - this._lastUpdatedRowsIndexed.get());
        this._lastUpdatedRowsIndexed.set(this._numRowsIndexed);
        long now = now();
        int i = this._numRowsConsumed - this._lastConsumedCount;
        long j = this._lastLogTime == 0 ? this._consumeStartTime : this._lastLogTime;
        if (now - j > TimeUnit.MINUTES.toMillis(1L) || i >= 100000) {
            this._segmentLogger.info("Consumed {} events from (rate:{}/s), currentOffset={}, numRowsConsumedSoFar={}, numRowsIndexedSoFar={}", Integer.valueOf(i), Float.valueOf((i * 1000.0f) / ((float) (now - j))), this._currentOffset, Integer.valueOf(this._numRowsConsumed), Integer.valueOf(this._numRowsIndexed));
            if (this._filteredMessageOffsets.size() > 0) {
                if (this._trackFilteredMessageOffsets) {
                    this._segmentLogger.info("Filtered events with offsets: {}", this._filteredMessageOffsets);
                }
                this._filteredMessageOffsets.clear();
            }
            this._lastConsumedCount = this._numRowsConsumed;
            this._lastLogTime = now;
        }
    }

    private StreamMessageDecoder createMessageDecoder(Set<String> set) {
        String decoderClass = this._streamConfig.getDecoderClass();
        try {
            this._streamConfig.getDecoderProperties();
            StreamMessageDecoder streamMessageDecoder = (StreamMessageDecoder) PluginManager.get().createInstance(decoderClass);
            streamMessageDecoder.init(set, this._streamConfig, this._tableConfig, this._schema);
            return streamMessageDecoder;
        } catch (Exception e) {
            throw new RuntimeException("Caught exception while creating StreamMessageDecoder from stream config: " + this._streamConfig, e);
        }
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public MutableSegment getSegment() {
        return this._realtimeSegment;
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public String getSegmentName() {
        return this._segmentNameStr;
    }

    public void forceCommit() {
        this._forceCommitMessageReceived = true;
    }

    static {
        $assertionsDisabled = !RealtimeSegmentDataManager.class.desiredAssertionStatus();
    }
}
