package org.apache.pinot.core.segment.processing.framework;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.tasks.MinionTaskBaseObserverStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.class */
public class SegmentProcessorFramework {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentProcessorFramework.class);
    public static final String MAP_STAGE = "MAP";
    public static final String REDUCE_STAGE = "REDUCE";
    public static final String GENERATE_STAGE = "GENERATE_SEGMENT";
    private final List<RecordReaderFileConfig> _recordReaderFileConfigs;
    private final List<RecordTransformer> _customRecordTransformers;
    private final TransformPipeline _transformPipeline;
    private final SegmentProcessorConfig _segmentProcessorConfig;
    private final File _mapperOutputDir;
    private final File _reducerOutputDir;
    private final File _segmentsOutputDir;
    private final SegmentNumRowProvider _segmentNumRowProvider;
    private int _segmentSequenceId;

    @Deprecated
    public SegmentProcessorFramework(List<RecordReader> list, SegmentProcessorConfig segmentProcessorConfig, File file) throws IOException {
        this(segmentProcessorConfig, file, convertRecordReadersToRecordReaderFileConfig(list), (List<RecordTransformer>) Collections.emptyList(), (SegmentNumRowProvider) null);
    }

    public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File file, List<RecordReaderFileConfig> list, List<RecordTransformer> list2, SegmentNumRowProvider segmentNumRowProvider) throws IOException {
        this(segmentProcessorConfig, file, list, list2, null, segmentNumRowProvider);
    }

    public SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File file, List<RecordReaderFileConfig> list, TransformPipeline transformPipeline, SegmentNumRowProvider segmentNumRowProvider) throws IOException {
        this(segmentProcessorConfig, file, list, null, transformPipeline, segmentNumRowProvider);
    }

    protected SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfig, File file, List<RecordReaderFileConfig> list, List<RecordTransformer> list2, TransformPipeline transformPipeline, SegmentNumRowProvider segmentNumRowProvider) throws IOException {
        this._segmentSequenceId = 0;
        Preconditions.checkState(!list.isEmpty(), "No recordReaderFileConfigs provided");
        LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}", Integer.valueOf(list.size()), segmentProcessorConfig, file.getAbsolutePath());
        this._recordReaderFileConfigs = list;
        this._customRecordTransformers = list2;
        this._transformPipeline = transformPipeline;
        this._segmentProcessorConfig = segmentProcessorConfig;
        this._mapperOutputDir = new File(file, "mapper_output");
        FileUtils.forceMkdir(this._mapperOutputDir);
        this._reducerOutputDir = new File(file, "reducer_output");
        FileUtils.forceMkdir(this._reducerOutputDir);
        this._segmentsOutputDir = new File(file, "segments_output");
        FileUtils.forceMkdir(this._segmentsOutputDir);
        this._segmentNumRowProvider = segmentNumRowProvider == null ? new DefaultSegmentNumRowProvider(segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider;
    }

    public static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(List<RecordReader> list) {
        Preconditions.checkState(!list.isEmpty(), "No record reader is provided");
        ArrayList arrayList = new ArrayList();
        Iterator<RecordReader> it2 = list.iterator();
        while (it2.hasNext()) {
            arrayList.add(new RecordReaderFileConfig(it2.next()));
        }
        return arrayList;
    }

    public List<File> process() throws Exception {
        try {
            try {
                List<File> doProcess = doProcess();
                FileUtils.deleteDirectory(this._mapperOutputDir);
                FileUtils.deleteDirectory(this._reducerOutputDir);
                return doProcess;
            } catch (Exception e) {
                FileUtils.deleteQuietly(this._segmentsOutputDir);
                throw e;
            }
        } catch (Throwable th) {
            FileUtils.deleteDirectory(this._mapperOutputDir);
            FileUtils.deleteDirectory(this._reducerOutputDir);
            throw th;
        }
    }

    private List<File> doProcess() throws Exception {
        ArrayList arrayList = new ArrayList();
        int size = this._recordReaderFileConfigs.size();
        int i = 0;
        int i2 = 1;
        boolean z = this._segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold() != Long.MAX_VALUE;
        while (i < size) {
            SegmentMapper segmentMapper = getSegmentMapper(this._recordReaderFileConfigs.subList(i, size));
            if (z) {
                String format = String.format("Starting iteration %d with %d record readers. Starting index = %d, end index = %d", Integer.valueOf(i2), Integer.valueOf(this._recordReaderFileConfigs.subList(i, size).size()), Integer.valueOf(i + 1), Integer.valueOf(size));
                LOGGER.info(format);
                logToObserver(MAP_STAGE, format);
            }
            long currentTimeMillis = System.currentTimeMillis();
            logToObserver(MAP_STAGE, "Starting Map phase for iteration " + i2);
            Map<String, GenericRowFileManager> map = segmentMapper.map();
            String str = "Finished Map phase for iteration " + i2 + " in " + (System.currentTimeMillis() - currentTimeMillis) + "ms";
            LOGGER.info(str);
            logToObserver(MAP_STAGE, str);
            if (map.isEmpty()) {
                LOGGER.info("No mapper output files generated, skipping reduce phase");
                logToObserver(MAP_STAGE, "No mapper output files generated, skipping reduce phase");
                i = getNextRecordReaderIndexToBeProcessed(i);
            } else {
                logToObserver(REDUCE_STAGE, "Starting Reduce phase for iteration " + i2);
                doReduce(map);
                logToObserver(GENERATE_STAGE, "Generating segments for iteration " + i2);
                arrayList.addAll(generateSegment(map));
                int i3 = i;
                i = getNextRecordReaderIndexToBeProcessed(i);
                if (z) {
                    String format2 = i == size ? String.format("Finished processing all of %d RecordReaders", Integer.valueOf(size)) : String.format("Finished processing RecordReaders %d to %d (RecordReader %d might be partially processed) out of %d in iteration %d", Integer.valueOf(i3 + 1), Integer.valueOf(i == size ? i : i + 1), Integer.valueOf(i + 1), Integer.valueOf(size), Integer.valueOf(i2));
                    LOGGER.info(format2);
                    logToObserver(GENERATE_STAGE, format2);
                }
                i2++;
            }
        }
        return arrayList;
    }

    private void logToObserver(String str, String str2) {
        this._segmentProcessorConfig.getProgressObserver().accept(new MinionTaskBaseObserverStats.StatusEntry.Builder().withStage(str).withStatus(str2).build());
    }

    protected SegmentMapper getSegmentMapper(List<RecordReaderFileConfig> list) {
        return this._transformPipeline != null ? new SegmentMapper(list, this._transformPipeline, this._segmentProcessorConfig, this._mapperOutputDir) : new SegmentMapper(list, this._customRecordTransformers, this._segmentProcessorConfig, this._mapperOutputDir);
    }

    private int getNextRecordReaderIndexToBeProcessed(int i) {
        for (int i2 = i; i2 < this._recordReaderFileConfigs.size(); i2++) {
            if (!this._recordReaderFileConfigs.get(i2).isRecordReaderDone()) {
                return i2;
            }
        }
        return this._recordReaderFileConfigs.size();
    }

    private void doReduce(Map<String, GenericRowFileManager> map) throws Exception {
        LOGGER.info("Beginning reduce phase on partitions: {}", map.keySet());
        Consumer<Object> progressObserver = this._segmentProcessorConfig.getProgressObserver();
        int size = map.size();
        int i = 1;
        for (Map.Entry<String, GenericRowFileManager> entry : map.entrySet()) {
            String key = entry.getKey();
            int i2 = i;
            i++;
            progressObserver.accept(String.format("Doing reduce phase on data from partition: %s (%d out of %d)", key, Integer.valueOf(i2), Integer.valueOf(size)));
            entry.setValue(ReducerFactory.getReducer(key, entry.getValue(), this._segmentProcessorConfig, this._reducerOutputDir).reduce());
        }
    }

    private List<File> generateSegment(Map<String, GenericRowFileManager> map) throws Exception {
        LOGGER.info("Beginning segment creation phase on partitions: {}", map.keySet());
        ArrayList arrayList = new ArrayList();
        TableConfig tableConfig = this._segmentProcessorConfig.getTableConfig();
        Schema schema = this._segmentProcessorConfig.getSchema();
        String segmentNamePrefix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
        String segmentNamePostfix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePostfix();
        String fixedSegmentName = this._segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
        SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
        segmentGeneratorConfig.setOutDir(this._segmentsOutputDir.getPath());
        Consumer<Object> progressObserver = this._segmentProcessorConfig.getProgressObserver();
        segmentGeneratorConfig.setCreationTime(String.valueOf(this._segmentProcessorConfig.getCustomCreationTime()));
        if (this._segmentProcessorConfig.getSegmentNameGenerator() != null) {
            segmentGeneratorConfig.setSegmentNameGenerator(this._segmentProcessorConfig.getSegmentNameGenerator());
        } else if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
            segmentGeneratorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName, false));
        } else {
            segmentGeneratorConfig.setSegmentNamePrefix(segmentNamePrefix);
            segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
            segmentGeneratorConfig.setSegmentName(fixedSegmentName);
        }
        for (Map.Entry<String, GenericRowFileManager> entry : map.entrySet()) {
            String key = entry.getKey();
            GenericRowFileManager value = entry.getValue();
            try {
                GenericRowFileReader fileReader = value.getFileReader();
                int numRows = fileReader.getNumRows();
                LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", key, Integer.valueOf(numRows), Integer.valueOf(fileReader.getNumSortFields()));
                GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
                int i = 0;
                while (i < numRows) {
                    int numRows2 = this._segmentNumRowProvider.getNumRows();
                    int min = Math.min(i + numRows2, numRows);
                    LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", Integer.valueOf(this._segmentSequenceId), Integer.valueOf(i), Integer.valueOf(min));
                    progressObserver.accept(String.format("Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", Integer.valueOf(this._segmentSequenceId), key, Integer.valueOf(i), Integer.valueOf(min), Integer.valueOf(numRows)));
                    segmentGeneratorConfig.setSequenceId(this._segmentSequenceId);
                    GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(i, min);
                    SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
                    segmentIndexCreationDriverImpl.init(segmentGeneratorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), TransformPipeline.getPassThroughPipeline());
                    segmentIndexCreationDriverImpl.build();
                    arrayList.add(segmentIndexCreationDriverImpl.getOutputDirectory());
                    this._segmentNumRowProvider.updateSegmentInfo(segmentIndexCreationDriverImpl.getSegmentStats().getTotalDocCount(), FileUtils.sizeOfDirectory(segmentIndexCreationDriverImpl.getOutputDirectory()));
                    i += numRows2;
                    this._segmentSequenceId++;
                }
            } finally {
                value.cleanUp();
            }
        }
        LOGGER.info("Successfully created segments: {}", arrayList);
        return arrayList;
    }
}
