package org.apache.pinot.core.segment.processing.framework;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.class */
public class SegmentProcessorFramework {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentProcessorFramework.class);
    private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
    private final SegmentProcessorConfig _segmentProcessorConfig;
    private final File _mapperOutputDir;
    private final File _reducerOutputDir;
    private final File _segmentsOutputDir;
    private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
    private final SegmentNumRowProvider _segmentNumRowProvider;

    @Deprecated
    public SegmentProcessorFramework(List<RecordReader> list, SegmentProcessorConfig segmentProcessorConfig, File file) throws IOException {
        this(segmentProcessorConfig, file, convertRecordReadersToRecordReaderFileConfig(list), null);
    }

    public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File file, List<RecordReaderFileConfig> list, SegmentNumRowProvider segmentNumRowProvider) throws IOException {
        Preconditions.checkState(!list.isEmpty(), "No recordReaderFileConfigs provided");
        LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}", Integer.valueOf(list.size()), segmentProcessorConfig, file.getAbsolutePath());
        this._recordReaderFileConfigs = list;
        this._segmentProcessorConfig = segmentProcessorConfig;
        this._mapperOutputDir = new File(file, "mapper_output");
        FileUtils.forceMkdir(this._mapperOutputDir);
        this._reducerOutputDir = new File(file, "reducer_output");
        FileUtils.forceMkdir(this._reducerOutputDir);
        this._segmentsOutputDir = new File(file, "segments_output");
        FileUtils.forceMkdir(this._segmentsOutputDir);
        this._segmentNumRowProvider = segmentNumRowProvider == null ? new DefaultSegmentNumRowProvider(segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider;
    }

    private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(List<RecordReader> list) {
        Preconditions.checkState(!list.isEmpty(), "No record reader is provided");
        ArrayList arrayList = new ArrayList();
        Iterator<RecordReader> it2 = list.iterator();
        while (it2.hasNext()) {
            arrayList.add(new RecordReaderFileConfig(it2.next()));
        }
        return arrayList;
    }

    public List<File> process() throws Exception {
        try {
            try {
                List<File> doProcess = doProcess();
                FileUtils.deleteDirectory(this._mapperOutputDir);
                FileUtils.deleteDirectory(this._reducerOutputDir);
                return doProcess;
            } catch (Exception e) {
                if (this._partitionToFileManagerMap != null) {
                    Iterator<GenericRowFileManager> it2 = this._partitionToFileManagerMap.values().iterator();
                    while (it2.hasNext()) {
                        it2.next().cleanUp();
                    }
                }
                FileUtils.deleteQuietly(this._segmentsOutputDir);
                throw e;
            }
        } catch (Throwable th) {
            FileUtils.deleteDirectory(this._mapperOutputDir);
            FileUtils.deleteDirectory(this._reducerOutputDir);
            throw th;
        }
    }

    private List<File> doProcess() throws Exception {
        LOGGER.info("Beginning map phase on {} record readers", Integer.valueOf(this._recordReaderFileConfigs.size()));
        this._partitionToFileManagerMap = new SegmentMapper(this._recordReaderFileConfigs, this._segmentProcessorConfig, this._mapperOutputDir).map();
        if (this._partitionToFileManagerMap.isEmpty()) {
            LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
            return Collections.emptyList();
        }
        LOGGER.info("Beginning reduce phase on partitions: {}", this._partitionToFileManagerMap.keySet());
        Consumer<Object> progressObserver = this._segmentProcessorConfig.getProgressObserver();
        int size = this._partitionToFileManagerMap.keySet().size();
        int i = 1;
        for (Map.Entry<String, GenericRowFileManager> entry : this._partitionToFileManagerMap.entrySet()) {
            String key = entry.getKey();
            int i2 = i;
            i++;
            progressObserver.accept(String.format("Doing reduce phase on data from partition: %s (%d out of %d)", key, Integer.valueOf(i2), Integer.valueOf(size)));
            entry.setValue(ReducerFactory.getReducer(key, entry.getValue(), this._segmentProcessorConfig, this._reducerOutputDir).reduce());
        }
        LOGGER.info("Beginning segment creation phase on partitions: {}", this._partitionToFileManagerMap.keySet());
        ArrayList arrayList = new ArrayList();
        TableConfig tableConfig = this._segmentProcessorConfig.getTableConfig();
        Schema schema = this._segmentProcessorConfig.getSchema();
        String segmentNamePrefix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
        String segmentNamePostfix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePostfix();
        String fixedSegmentName = this._segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
        SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
        segmentGeneratorConfig.setOutDir(this._segmentsOutputDir.getPath());
        if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
            segmentGeneratorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName, false));
        } else {
            segmentGeneratorConfig.setSegmentNamePrefix(segmentNamePrefix);
            segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
        }
        int i3 = 0;
        for (Map.Entry<String, GenericRowFileManager> entry2 : this._partitionToFileManagerMap.entrySet()) {
            String key2 = entry2.getKey();
            GenericRowFileManager value = entry2.getValue();
            try {
                GenericRowFileReader fileReader = value.getFileReader();
                int numRows = fileReader.getNumRows();
                LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", key2, Integer.valueOf(numRows), Integer.valueOf(fileReader.getNumSortFields()));
                GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
                int i4 = 0;
                while (i4 < numRows) {
                    int numRows2 = this._segmentNumRowProvider.getNumRows();
                    int min = Math.min(i4 + numRows2, numRows);
                    LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", Integer.valueOf(i3), Integer.valueOf(i4), Integer.valueOf(min));
                    progressObserver.accept(String.format("Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", Integer.valueOf(i3), key2, Integer.valueOf(i4), Integer.valueOf(min), Integer.valueOf(numRows)));
                    segmentGeneratorConfig.setSequenceId(i3);
                    GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(i4, min);
                    SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
                    segmentIndexCreationDriverImpl.init(segmentGeneratorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), TransformPipeline.getPassThroughPipeline());
                    segmentIndexCreationDriverImpl.build();
                    arrayList.add(segmentIndexCreationDriverImpl.getOutputDirectory());
                    this._segmentNumRowProvider.updateSegmentInfo(segmentIndexCreationDriverImpl.getSegmentStats().getTotalDocCount(), FileUtils.sizeOfDirectory(segmentIndexCreationDriverImpl.getOutputDirectory()));
                    i4 += numRows2;
                    i3++;
                }
            } finally {
                value.cleanUp();
            }
        }
        LOGGER.info("Successfully created segments: {}", arrayList);
        return arrayList;
    }
}
