package org.apache.pinot.segment.local.realtime.writer;

import java.io.Closeable;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDataDecoder;
import org.apache.pinot.spi.stream.StreamDataDecoderImpl;
import org.apache.pinot.spi.stream.StreamDataDecoderResult;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.class */
public class StatelessRealtimeSegmentWriter implements Closeable {
    private static final int DEFAULT_CAPACITY = 100000;
    private static final int DEFAULT_FETCH_TIMEOUT_MS = 5000;
    public static final String SEGMENT_STATS_FILE_NAME = "segment-stats.ser";
    public static final String RESOURCE_TMP_DIR_PREFIX = "resourceTmpDir_";
    public static final String RESOURCE_DATA_DIR_PREFIX = "resourceDataDir_";
    private final Semaphore _segBuildSemaphore;
    private final String _segmentName;
    private final String _tableNameWithType;
    private final int _partitionGroupId;
    private final SegmentZKMetadata _segmentZKMetadata;
    private final TableConfig _tableConfig;
    private final Schema _schema;
    private final StreamConfig _streamConfig;
    private final StreamConsumerFactory _consumerFactory;
    private StreamMetadataProvider _partitionMetadataProvider;
    private final PartitionGroupConsumer _consumer;
    private final StreamDataDecoder _decoder;
    private final MutableSegmentImpl _realtimeSegment;
    private final File _resourceTmpDir;
    private final File _resourceDataDir;
    private final Logger _logger;
    private Thread _consumerThread;
    private final StreamPartitionMsgOffset _startOffset;
    private final StreamPartitionMsgOffset _endOffset;
    private volatile StreamPartitionMsgOffset _currentOffset;
    private final int _fetchTimeoutMs;
    private final TransformPipeline _transformPipeline;
    private volatile Throwable _consumptionException;
    private final AtomicBoolean _isDoneConsuming = new AtomicBoolean(false);
    private volatile boolean _isSuccess = false;

    /* loaded from: input_file:org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter$PartitionConsumer.class */
    private class PartitionConsumer implements Runnable {
        private PartitionConsumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    StatelessRealtimeSegmentWriter.this._consumer.start(StatelessRealtimeSegmentWriter.this._startOffset);
                    StatelessRealtimeSegmentWriter.this._logger.info("Created new consumer thread {} for {}", StatelessRealtimeSegmentWriter.this._consumerThread, this);
                    StatelessRealtimeSegmentWriter.this._currentOffset = StatelessRealtimeSegmentWriter.this._startOffset;
                    TransformPipeline.Result result = new TransformPipeline.Result();
                    while (StatelessRealtimeSegmentWriter.this._currentOffset.compareTo(StatelessRealtimeSegmentWriter.this._endOffset) < 0) {
                        MessageBatch fetchMessages = StatelessRealtimeSegmentWriter.this._consumer.fetchMessages(StatelessRealtimeSegmentWriter.this._currentOffset, StatelessRealtimeSegmentWriter.this._fetchTimeoutMs);
                        int messageCount = fetchMessages.getMessageCount();
                        int i = 0;
                        while (true) {
                            if (i < messageCount) {
                                StreamMessage streamMessage = fetchMessages.getStreamMessage(i);
                                if (streamMessage.getMetadata() != null && streamMessage.getMetadata().getOffset() != null && streamMessage.getMetadata().getOffset().compareTo(StatelessRealtimeSegmentWriter.this._endOffset) >= 0) {
                                    StatelessRealtimeSegmentWriter.this._logger.info("Reached end offset: {} for partition group: {}", StatelessRealtimeSegmentWriter.this._endOffset, Integer.valueOf(StatelessRealtimeSegmentWriter.this._partitionGroupId));
                                    break;
                                }
                                StreamDataDecoderResult decode = StatelessRealtimeSegmentWriter.this._decoder.decode(streamMessage);
                                if (decode.getException() == null) {
                                    StatelessRealtimeSegmentWriter.this._transformPipeline.processRow(decode.getResult(), result);
                                    Iterator<GenericRow> it2 = result.getTransformedRows().iterator();
                                    while (it2.hasNext()) {
                                        StatelessRealtimeSegmentWriter.this._realtimeSegment.index(it2.next(), streamMessage.getMetadata());
                                    }
                                } else {
                                    StatelessRealtimeSegmentWriter.this._logger.warn("Failed to decode message at offset {}: {}", StatelessRealtimeSegmentWriter.this._currentOffset, decode.getException());
                                }
                                i++;
                            }
                        }
                        StatelessRealtimeSegmentWriter.this._currentOffset = fetchMessages.getOffsetOfNextBatch();
                    }
                    StatelessRealtimeSegmentWriter.this._isSuccess = true;
                } finally {
                    try {
                        StatelessRealtimeSegmentWriter.this._consumer.close();
                    } catch (Exception e) {
                        StatelessRealtimeSegmentWriter.this._logger.warn("Failed to close consumer", (Throwable) e);
                    }
                    StatelessRealtimeSegmentWriter.this._isDoneConsuming.set(true);
                }
            } catch (Exception e2) {
                StatelessRealtimeSegmentWriter.this._logger.error("Exception in consumer thread", (Throwable) e2);
                StatelessRealtimeSegmentWriter.this._consumptionException = e2;
                throw new RuntimeException(e2);
            }
        }
    }

    public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, @Nullable Semaphore semaphore) throws Exception {
        Preconditions.checkNotNull(indexLoadingConfig.getTableConfig(), "Table config must be set in index loading config");
        Preconditions.checkNotNull(indexLoadingConfig.getSchema(), "Schema must be set in index loading config");
        LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
        this._segmentName = segmentZKMetadata.getSegmentName();
        this._partitionGroupId = lLCSegmentName.getPartitionGroupId();
        this._segBuildSemaphore = semaphore;
        this._tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(lLCSegmentName.getTableName());
        this._segmentZKMetadata = segmentZKMetadata;
        this._tableConfig = indexLoadingConfig.getTableConfig();
        this._schema = indexLoadingConfig.getSchema();
        String absolutePath = indexLoadingConfig.getTableDataDir() == null ? FileUtils.getTempDirectory().getAbsolutePath() : indexLoadingConfig.getTableDataDir();
        File file = new File(absolutePath, "reingestion");
        this._resourceTmpDir = new File(file, "resourceTmpDir_" + this._segmentName + "_" + System.currentTimeMillis());
        this._resourceDataDir = new File(file, "resourceDataDir_" + this._segmentName + "_" + System.currentTimeMillis());
        FileUtils.deleteQuietly(this._resourceTmpDir);
        FileUtils.deleteQuietly(this._resourceDataDir);
        FileUtils.forceMkdir(this._resourceTmpDir);
        FileUtils.forceMkdir(this._resourceDataDir);
        this._logger = LoggerFactory.getLogger(StatelessRealtimeSegmentWriter.class.getName() + "_" + this._segmentName);
        this._streamConfig = IngestionConfigUtils.getFirstStreamConfig(this._tableConfig);
        StreamPartitionMsgOffsetFactory createStreamMsgOffsetFactory = StreamConsumerFactoryProvider.create(this._streamConfig).createStreamMsgOffsetFactory();
        this._startOffset = createStreamMsgOffsetFactory.create(segmentZKMetadata.getStartOffset());
        this._endOffset = createStreamMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
        String clientId = getClientId();
        this._consumerFactory = StreamConsumerFactoryProvider.create(this._streamConfig);
        this._partitionMetadataProvider = this._consumerFactory.createPartitionMetadataProvider(clientId, this._partitionGroupId);
        this._consumer = this._consumerFactory.createPartitionGroupConsumer(clientId, new PartitionGroupConsumptionStatus(this._partitionGroupId, 0, this._startOffset, null, null));
        this._decoder = createDecoder(IngestionUtils.getFieldsForRecordExtractor(this._tableConfig, this._schema));
        int flushThresholdRows = this._streamConfig.getFlushThresholdRows();
        RealtimeSegmentConfig.Builder memoryManager = new RealtimeSegmentConfig.Builder(indexLoadingConfig).setTableNameWithType(this._tableNameWithType).setSegmentName(this._segmentName).setStreamName(this._streamConfig.getTopicName()).setSegmentZKMetadata(this._segmentZKMetadata).setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(absolutePath, SEGMENT_STATS_FILE_NAME))).setSchema(this._schema).setCapacity(flushThresholdRows <= 0 ? 100000 : flushThresholdRows).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount()).setOffHeap(indexLoadingConfig.isRealtimeOffHeapAllocation()).setFieldConfigList(this._tableConfig.getFieldConfigList()).setConsumerDir(this._resourceDataDir.getAbsolutePath()).setMemoryManager(new MmapMemoryManager(FileUtils.getTempDirectory().getAbsolutePath(), this._segmentName, null));
        setPartitionParameters(memoryManager, this._tableConfig.getIndexingConfig().getSegmentPartitionConfig());
        this._realtimeSegment = new MutableSegmentImpl(memoryManager.build(), null);
        this._transformPipeline = new TransformPipeline(this._tableConfig, this._schema);
        this._fetchTimeoutMs = this._streamConfig.getFetchTimeoutMillis() > 0 ? this._streamConfig.getFetchTimeoutMillis() : 5000;
    }

    private String getClientId() {
        return this._tableNameWithType + "-" + this._partitionGroupId;
    }

    public void startConsumption() {
        this._consumerThread = new Thread(new PartitionConsumer(), this._segmentName);
        this._consumerThread.start();
    }

    private StreamDataDecoder createDecoder(Set<String> set) throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2000000476837158d).attempt(() -> {
            try {
                atomicReference.set(new StreamDataDecoderImpl(createMessageDecoder(set)));
                return true;
            } catch (Exception e) {
                this._logger.warn("Failed to create StreamMessageDecoder. Retrying...", (Throwable) e);
                return false;
            }
        });
        return (StreamDataDecoder) atomicReference.get();
    }

    private StreamMessageDecoder createMessageDecoder(Set<String> set) {
        try {
            StreamMessageDecoder streamMessageDecoder = (StreamMessageDecoder) PluginManager.get().createInstance(this._streamConfig.getDecoderClass());
            streamMessageDecoder.init(set, this._streamConfig, this._tableConfig, this._schema);
            return streamMessageDecoder;
        } catch (Exception e) {
            throw new RuntimeException("Caught exception while creating StreamMessageDecoder from stream config: " + String.valueOf(this._streamConfig), e);
        }
    }

    public void stopConsumption() {
        if (this._consumerThread.isAlive()) {
            this._consumerThread.interrupt();
            try {
                this._consumerThread.join();
            } catch (InterruptedException e) {
                this._logger.warn("Interrupted while waiting for consumer thread to finish");
            }
        }
    }

    public boolean isDoneConsuming() {
        return this._isDoneConsuming.get();
    }

    public boolean isSuccess() {
        return this._isSuccess;
    }

    public Throwable getConsumptionException() {
        return this._consumptionException;
    }

    /* JADX WARN: Finally extract failed */
    public File buildSegment() {
        this._logger.info("Building segment from {} to {}", this._startOffset, this._endOffset);
        long now = now();
        try {
            if (this._segBuildSemaphore != null) {
                this._logger.info("Trying to acquire semaphore for building segment: {}", this._segmentName);
                Instant now2 = Instant.now();
                for (int i = 5; !this._segBuildSemaphore.tryAcquire(i, TimeUnit.SECONDS); i = Math.min(i * 2, 300)) {
                    this._logger.warn("Could not acquire semaphore for building segment in {}", Duration.between(now2, Instant.now()));
                }
            }
            try {
                long now3 = now();
                this._logger.info("Acquired lock for building segment in {} ms", Long.valueOf(now3 - now));
                SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
                segmentZKPropsConfig.setStartOffset(this._startOffset.toString());
                segmentZKPropsConfig.setEndOffset(this._endOffset.toString());
                RealtimeSegmentConverter realtimeSegmentConverter = new RealtimeSegmentConverter(this._realtimeSegment, segmentZKPropsConfig, this._resourceTmpDir.getAbsolutePath(), this._schema, this._tableNameWithType, this._tableConfig, this._segmentZKMetadata.getSegmentName(), this._tableConfig.getIndexingConfig().isNullHandlingEnabled());
                try {
                    realtimeSegmentConverter.build(null, null);
                    this._logger.info("Successfully built segment (Column Mode: {}) in {} ms", Boolean.valueOf(realtimeSegmentConverter.isColumnMajorEnabled()), Long.valueOf(now() - now3));
                    File file = new File(this._resourceTmpDir, this._segmentName);
                    File file2 = new File(this._resourceTmpDir, this._segmentName + ".tar.gz");
                    try {
                        TarCompressionUtils.createCompressedTarFile(new File(this._resourceTmpDir, this._segmentName), file2);
                        if (this._segBuildSemaphore != null) {
                            this._logger.info("Releasing semaphore for building segment");
                            this._segBuildSemaphore.release();
                        }
                        return file2;
                    } catch (Exception e) {
                        throw new RuntimeException("Caught exception while tarring index directory from: " + String.valueOf(file) + " to: " + String.valueOf(file2), e);
                    }
                } catch (Exception e2) {
                    throw new RuntimeException("Failed to build segment", e2);
                }
            } catch (Throwable th) {
                if (this._segBuildSemaphore != null) {
                    this._logger.info("Releasing semaphore for building segment");
                    this._segBuildSemaphore.release();
                }
                throw th;
            }
        } catch (InterruptedException e3) {
            throw new RuntimeException("Interrupted while waiting for segment build semaphore", e3);
        }
    }

    protected long now() {
        return System.currentTimeMillis();
    }

    private void setPartitionParameters(RealtimeSegmentConfig.Builder builder, SegmentPartitionConfig segmentPartitionConfig) {
        if (segmentPartitionConfig != null) {
            Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
            if (columnPartitionMap.size() != 1) {
                this._logger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
                return;
            }
            Map.Entry<String, ColumnPartitionConfig> next = columnPartitionMap.entrySet().iterator().next();
            String key = next.getKey();
            ColumnPartitionConfig value = next.getValue();
            String functionName = value.getFunctionName();
            int numPartitions = value.getNumPartitions();
            try {
                int size = this._partitionMetadataProvider.computePartitionGroupMetadata(getClientId(), this._streamConfig, Collections.emptyList(), 5000).size();
                if (size != numPartitions) {
                    this._logger.info("Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", Integer.valueOf(size), Integer.valueOf(numPartitions));
                    numPartitions = size;
                }
            } catch (Exception e) {
                this._logger.warn("Failed to get number of stream partitions in 5s, using number of partitions in the partition config: {}", Integer.valueOf(numPartitions), e);
                createPartitionMetadataProvider("Timeout getting number of stream partitions");
            }
            builder.setPartitionColumn(key);
            builder.setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, null));
            builder.setPartitionId(this._partitionGroupId);
        }
    }

    private void createPartitionMetadataProvider(String str) {
        closePartitionMetadataProvider();
        this._logger.info("Creating new partition metadata provider, reason: {}", str);
        this._partitionMetadataProvider = this._consumerFactory.createPartitionMetadataProvider(getClientId(), this._partitionGroupId);
    }

    private void closePartitionMetadataProvider() {
        if (this._partitionMetadataProvider != null) {
            try {
                this._partitionMetadataProvider.close();
            } catch (Exception e) {
                this._logger.warn("Could not close stream metadata provider", (Throwable) e);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stopConsumption();
        this._realtimeSegment.destroy();
        FileUtils.deleteQuietly(this._resourceTmpDir);
        FileUtils.deleteQuietly(this._resourceDataDir);
    }
}
