package org.apache.pinot.core.data.manager.realtime;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.class */
public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) HLRealtimeSegmentDataManager.class);
    private static final long ONE_MINUTE_IN_MILLSEC = 60000;
    private final String _tableNameWithType;
    private final String _segmentName;
    private final String _timeColumnName;
    private final TimeUnit _timeType;
    private final RecordTransformer _recordTransformer;
    private final StreamLevelConsumer _streamLevelConsumer;
    private final File _resourceTmpDir;
    private final MutableSegmentImpl _realtimeSegment;
    private final String _tableStreamName;
    private final StreamConfig _streamConfig;
    private long _segmentEndTimeThreshold;
    private final TimerTask _segmentStatusTask;
    private final ServerMetrics _serverMetrics;
    private final RealtimeTableDataManager _notifier;
    private Thread _indexingThread;
    private final String _sortedColumn;
    private final List<String> _invertedIndexColumns;
    private final List<String> _noDictionaryColumns;
    private final List<String> _varLengthDictionaryColumns;
    private final Logger _segmentLogger;
    private final SegmentVersion _segmentVersion;
    private final long _start = System.currentTimeMillis();
    private AtomicLong _lastUpdatedRawDocuments = new AtomicLong(0);
    private volatile boolean _keepIndexing = true;
    private volatile boolean _isShuttingDown = false;
    private PinotMeter _tableAndStreamRowsConsumed = null;
    private PinotMeter _tableRowsConsumed = null;

    public HLRealtimeSegmentDataManager(final SegmentZKMetadata segmentZKMetadata, final TableConfig tableConfig, InstanceZKMetadata instanceZKMetadata, RealtimeTableDataManager realtimeTableDataManager, final String str, final IndexLoadingConfig indexLoadingConfig, final Schema schema, final ServerMetrics serverMetrics) throws Exception {
        this._segmentVersion = indexLoadingConfig.getSegmentVersion();
        this._recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
        this._serverMetrics = serverMetrics;
        this._segmentName = segmentZKMetadata.getSegmentName();
        this._tableNameWithType = tableConfig.getTableName();
        this._timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        Preconditions.checkNotNull(this._timeColumnName, "Must provide valid timeColumnName in tableConfig for realtime table {}", this._tableNameWithType);
        DateTimeFieldSpec specForTimeColumn = schema.getSpecForTimeColumn(this._timeColumnName);
        Preconditions.checkNotNull(specForTimeColumn, "Must provide field spec for time column {}", this._timeColumnName);
        this._timeType = specForTimeColumn.getFormatSpec().getColumnUnit();
        List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
        if (sortedColumns.isEmpty()) {
            LOGGER.info("RealtimeDataResourceZKMetadata contains no information about sorted column for segment {}", this._segmentName);
            this._sortedColumn = null;
        } else {
            String str2 = sortedColumns.get(0);
            if (schema.hasColumn(str2)) {
                LOGGER.info("Setting sorted column name: {} from RealtimeDataResourceZKMetadata for segment {}", str2, this._segmentName);
                this._sortedColumn = str2;
            } else {
                LOGGER.warn("Sorted column name: {} from RealtimeDataResourceZKMetadata is not existed in schema for segment {}.", str2, this._segmentName);
                this._sortedColumn = null;
            }
        }
        Set<String> invertedIndexColumns = indexLoadingConfig.getInvertedIndexColumns();
        if (this._sortedColumn != null) {
            invertedIndexColumns.add(this._sortedColumn);
        }
        this._invertedIndexColumns = new ArrayList(invertedIndexColumns);
        this._noDictionaryColumns = new ArrayList(indexLoadingConfig.getNoDictionaryColumns());
        this._varLengthDictionaryColumns = new ArrayList(indexLoadingConfig.getVarLengthDictionaryColumns());
        this._streamConfig = new StreamConfig(this._tableNameWithType, IngestionConfigUtils.getStreamConfigMap(tableConfig));
        this._segmentLogger = LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class.getName() + "_" + this._segmentName + "_" + this._streamConfig.getTopicName());
        this._segmentLogger.info("Created segment data manager with Sorted column:{}, invertedIndexColumns:{}", this._sortedColumn, this._invertedIndexColumns);
        this._segmentEndTimeThreshold = this._start + this._streamConfig.getFlushThresholdTimeMillis();
        this._resourceTmpDir = new File(str, "_tmp");
        if (!this._resourceTmpDir.exists()) {
            this._resourceTmpDir.mkdirs();
        }
        this._streamLevelConsumer = StreamConsumerFactoryProvider.create(this._streamConfig).createStreamLevelConsumer(HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + this._streamConfig.getTopicName(), this._tableNameWithType, IngestionUtils.getFieldsForRecordExtractor(tableConfig.getIngestionConfig(), schema), instanceZKMetadata.getGroupId(this._tableNameWithType));
        this._streamLevelConsumer.start();
        this._tableStreamName = this._tableNameWithType + "_" + this._streamConfig.getTopicName();
        final IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
        if (indexingConfig != null && indexingConfig.isAggregateMetrics()) {
            LOGGER.warn("Updating of metrics only supported for LLC consumer, ignoring.");
        }
        this._segmentLogger.info("Started {} stream provider", this._streamConfig.getType());
        int flushThresholdRows = this._streamConfig.getFlushThresholdRows();
        this._realtimeSegment = new MutableSegmentImpl(new RealtimeSegmentConfig.Builder().setTableNameWithType(this._tableNameWithType).setSegmentName(this._segmentName).setStreamName(this._streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(this._timeColumnName).setCapacity(flushThresholdRows).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()).setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns()).setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns()).setInvertedIndexColumns(invertedIndexColumns).setSegmentZKMetadata(segmentZKMetadata).setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()).setMemoryManager(getMemoryManager(realtimeTableDataManager.getConsumerDir(), this._segmentName, indexLoadingConfig.isRealtimeOffHeapAllocation(), indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), serverMetrics)).setStatsHistory(realtimeTableDataManager.getStatsHistory()).setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build(), serverMetrics);
        this._notifier = realtimeTableDataManager;
        LOGGER.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", this._segmentName, Integer.valueOf(flushThresholdRows), new DateTime(this._segmentEndTimeThreshold, DateTimeZone.UTC).toString());
        this._segmentStatusTask = new TimerTask() { // from class: org.apache.pinot.core.data.manager.realtime.HLRealtimeSegmentDataManager.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                HLRealtimeSegmentDataManager.this.computeKeepIndexing();
            }
        };
        this._indexingThread = new Thread(new Runnable() { // from class: org.apache.pinot.core.data.manager.realtime.HLRealtimeSegmentDataManager.2
            @Override // java.lang.Runnable
            public void run() {
                boolean z = true;
                long j = 50;
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Starting to collect rows");
                int i = 0;
                GenericRow genericRow = new GenericRow();
                do {
                    genericRow.clear();
                    try {
                        try {
                            GenericRow next = HLRealtimeSegmentDataManager.this._streamLevelConsumer.next(genericRow);
                            HLRealtimeSegmentDataManager.this._tableAndStreamRowsConsumed = serverMetrics.addMeteredTableValue(HLRealtimeSegmentDataManager.this._tableStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L, HLRealtimeSegmentDataManager.this._tableAndStreamRowsConsumed);
                            HLRealtimeSegmentDataManager.this._tableRowsConsumed = serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, HLRealtimeSegmentDataManager.this._tableRowsConsumed);
                            if (next != null) {
                                try {
                                    GenericRow transform = HLRealtimeSegmentDataManager.this._recordTransformer.transform(next);
                                    if (transform != null && IngestionUtils.shouldIngestRow(transform)) {
                                        z = HLRealtimeSegmentDataManager.this._realtimeSegment.index(transform, null);
                                        j = 50;
                                    }
                                } catch (Exception e) {
                                    HLRealtimeSegmentDataManager.this._segmentLogger.warn("Caught exception while indexing row, sleeping for {} ms, row contents {}", Long.valueOf(j), next, e);
                                    i++;
                                    Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                                    j = Math.min(60000L, j * 2);
                                }
                            }
                        } catch (Exception e2) {
                            HLRealtimeSegmentDataManager.this._segmentLogger.warn("Caught exception while consuming row, sleeping for {} ms", Long.valueOf(j), e2);
                            i++;
                            serverMetrics.addMeteredTableValue(HLRealtimeSegmentDataManager.this._tableStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
                            serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
                            Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                            j = Math.min(60000L, j * 2);
                        }
                        if (!z || !HLRealtimeSegmentDataManager.this._keepIndexing) {
                            break;
                        }
                    } catch (Error e3) {
                        HLRealtimeSegmentDataManager.this._segmentLogger.error("Caught error in indexing thread", (Throwable) e3);
                        throw e3;
                    }
                } while (!HLRealtimeSegmentDataManager.this._isShuttingDown);
                if (HLRealtimeSegmentDataManager.this._isShuttingDown) {
                    HLRealtimeSegmentDataManager.this._segmentLogger.info("Shutting down indexing thread!");
                    return;
                }
                if (i > 0) {
                    try {
                        serverMetrics.addMeteredTableValue(HLRealtimeSegmentDataManager.this._tableStreamName, ServerMeter.ROWS_WITH_ERRORS, i);
                    } catch (Exception e4) {
                        HLRealtimeSegmentDataManager.this._segmentLogger.error("Caught exception in the realtime indexing thread", (Throwable) e4);
                        return;
                    }
                }
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Indexing threshold reached, proceeding with index conversion");
                HLRealtimeSegmentDataManager.this._segmentStatusTask.cancel();
                HLRealtimeSegmentDataManager.this.updateCurrentDocumentCountMetrics();
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Indexed {} raw events", Integer.valueOf(HLRealtimeSegmentDataManager.this._realtimeSegment.getNumDocsIndexed()));
                File file = new File(HLRealtimeSegmentDataManager.this._resourceTmpDir, "tmp-" + System.currentTimeMillis());
                RealtimeSegmentConverter realtimeSegmentConverter = new RealtimeSegmentConverter(HLRealtimeSegmentDataManager.this._realtimeSegment, null, file.getAbsolutePath(), schema, HLRealtimeSegmentDataManager.this._tableNameWithType, tableConfig, segmentZKMetadata.getSegmentName(), HLRealtimeSegmentDataManager.this._sortedColumn, HLRealtimeSegmentDataManager.this._invertedIndexColumns, Collections.emptyList(), Collections.emptyList(), HLRealtimeSegmentDataManager.this._noDictionaryColumns, HLRealtimeSegmentDataManager.this._varLengthDictionaryColumns, indexingConfig.isNullHandlingEnabled());
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Trying to build segment");
                long nanoTime = System.nanoTime();
                realtimeSegmentConverter.build(HLRealtimeSegmentDataManager.this._segmentVersion, serverMetrics);
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Built segment in {} ms", Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
                File file2 = new File(str, segmentZKMetadata.getSegmentName());
                FileUtils.deleteQuietly(file2);
                FileUtils.moveDirectory(file.listFiles()[0], file2);
                FileUtils.deleteQuietly(file);
                long minTime = HLRealtimeSegmentDataManager.this._realtimeSegment.getMinTime();
                long maxTime = HLRealtimeSegmentDataManager.this._realtimeSegment.getMaxTime();
                HLRealtimeSegmentDataManager.this._segmentLogger.info("Committing {} offsets", HLRealtimeSegmentDataManager.this._streamConfig.getType());
                boolean z2 = false;
                try {
                    HLRealtimeSegmentDataManager.this._streamLevelConsumer.commit();
                    z2 = true;
                    HLRealtimeSegmentDataManager.this._streamLevelConsumer.shutdown();
                    HLRealtimeSegmentDataManager.this._segmentLogger.info("Successfully committed {} offsets, consumer release requested.", HLRealtimeSegmentDataManager.this._streamConfig.getType());
                    serverMetrics.addMeteredTableValue(HLRealtimeSegmentDataManager.this._tableStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
                    serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
                } catch (Throwable th) {
                    HLRealtimeSegmentDataManager.this._segmentLogger.error("FATAL: Exception committing or shutting down consumer commitSuccessful={}", Boolean.valueOf(z2), th);
                    serverMetrics.addMeteredTableValue(HLRealtimeSegmentDataManager.this._tableNameWithType, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
                    if (!z2) {
                        HLRealtimeSegmentDataManager.this._streamLevelConsumer.shutdown();
                    }
                }
                try {
                    HLRealtimeSegmentDataManager.this._segmentLogger.info("Marking current segment as completed in Helix");
                    SegmentZKMetadata segmentZKMetadata2 = new SegmentZKMetadata(segmentZKMetadata.getSegmentName());
                    segmentZKMetadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
                    segmentZKMetadata2.setStartTime(minTime);
                    segmentZKMetadata2.setEndTime(maxTime);
                    segmentZKMetadata2.setTimeUnit(HLRealtimeSegmentDataManager.this._timeType);
                    segmentZKMetadata2.setTotalDocs(HLRealtimeSegmentDataManager.this._realtimeSegment.getNumDocsIndexed());
                    HLRealtimeSegmentDataManager.this._notifier.replaceHLSegment(segmentZKMetadata2, indexLoadingConfig);
                    HLRealtimeSegmentDataManager.this._segmentLogger.info("Completed write of segment completion to Helix, waiting for controller to assign a new segment");
                } catch (Exception e5) {
                    if (z2) {
                        HLRealtimeSegmentDataManager.this._segmentLogger.error("Offsets were committed to Kafka but we were unable to mark this segment as completed in Helix. Manually mark the segment as completed in Helix; restarting this instance will result in data loss.", (Throwable) e5);
                    } else {
                        HLRealtimeSegmentDataManager.this._segmentLogger.warn("Caught exception while marking segment as completed in Helix. Offsets were not written, restarting the instance should be safe.", (Throwable) e5);
                    }
                }
            }
        });
        this._indexingThread.start();
        serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
        this._segmentLogger.debug("scheduling keepIndexing timer check");
        TimerService.TIMER.schedule(this._segmentStatusTask, 60000L, 60000L);
        this._segmentLogger.info("finished scheduling keepIndexing timer check");
    }

    @Override // org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager, org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public MutableSegment getSegment() {
        return this._realtimeSegment;
    }

    @Override // org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager
    public Map<String, String> getPartitionToCurrentOffset() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager
    public CommonConstants.ConsumerState getConsumerState() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager
    public long getLastConsumedTimestamp() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public String getSegmentName() {
        return this._segmentName;
    }

    private void computeKeepIndexing() {
        if (this._keepIndexing) {
            this._segmentLogger.debug("Current indexed {} raw events", Integer.valueOf(this._realtimeSegment.getNumDocsIndexed()));
            if (System.currentTimeMillis() >= this._segmentEndTimeThreshold || this._realtimeSegment.getNumDocsIndexed() >= this._streamConfig.getFlushThresholdRows()) {
                if (this._realtimeSegment.getNumDocsIndexed() == 0) {
                    this._segmentLogger.info("no new events coming in, extending the end time by another hour");
                    this._segmentEndTimeThreshold = System.currentTimeMillis() + this._streamConfig.getFlushThresholdTimeMillis();
                    return;
                } else {
                    this._segmentLogger.info("Stopped indexing due to reaching segment limit: {} raw documents indexed, segment is aged {} minutes", Integer.valueOf(this._realtimeSegment.getNumDocsIndexed()), Long.valueOf((System.currentTimeMillis() - this._start) / 60000));
                    this._keepIndexing = false;
                }
            }
        }
        updateCurrentDocumentCountMetrics();
    }

    private void updateCurrentDocumentCountMetrics() {
        int numDocsIndexed = this._realtimeSegment.getNumDocsIndexed();
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.DOCUMENT_COUNT, numDocsIndexed - this._lastUpdatedRawDocuments.get());
        this._lastUpdatedRawDocuments.set(numDocsIndexed);
    }

    @Override // org.apache.pinot.segment.local.data.manager.SegmentDataManager
    public void destroy() {
        LOGGER.info("Trying to shutdown RealtimeSegmentDataManager : {}!", this._segmentName);
        this._isShuttingDown = true;
        try {
            this._streamLevelConsumer.shutdown();
        } catch (Exception e) {
            LOGGER.error("Failed to shutdown stream consumer!", (Throwable) e);
        }
        this._keepIndexing = false;
        this._segmentStatusTask.cancel();
        this._realtimeSegment.destroy();
    }
}
