package org.apache.pinot.core.segment.processing.framework;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
import org.apache.pinot.core.segment.processing.mapper.SegmentMapper;
import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.class */
public class SegmentProcessorFramework {
    private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
    private final List<RecordReader> _recordReaders;
    private final SegmentProcessorConfig _segmentProcessorConfig;
    private final File _mapperOutputDir;
    private final File _reducerOutputDir;
    private final File _segmentsOutputDir;

    public SegmentProcessorFramework(List<RecordReader> list, SegmentProcessorConfig segmentProcessorConfig, File file) throws IOException {
        Preconditions.checkState(!list.isEmpty(), "No record reader is provided");
        LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}", new Object[]{Integer.valueOf(list.size()), segmentProcessorConfig, file.getAbsolutePath()});
        this._recordReaders = list;
        this._segmentProcessorConfig = segmentProcessorConfig;
        this._mapperOutputDir = new File(file, "mapper_output");
        FileUtils.forceMkdir(this._mapperOutputDir);
        this._reducerOutputDir = new File(file, "reducer_output");
        FileUtils.forceMkdir(this._reducerOutputDir);
        this._segmentsOutputDir = new File(file, "segments_output");
        FileUtils.forceMkdir(this._segmentsOutputDir);
    }

    public List<File> process() throws Exception {
        LOGGER.info("Beginning map phase on {} record readers", Integer.valueOf(this._recordReaders.size()));
        Map<String, GenericRowFileManager> map = new SegmentMapper(this._recordReaders, this._segmentProcessorConfig, this._mapperOutputDir).map();
        if (map.isEmpty()) {
            LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
            return Collections.emptyList();
        }
        LOGGER.info("Beginning reduce phase on partitions: {}", map.keySet());
        for (Map.Entry<String, GenericRowFileManager> entry : map.entrySet()) {
            entry.setValue(ReducerFactory.getReducer(entry.getKey(), entry.getValue(), this._segmentProcessorConfig, this._reducerOutputDir).reduce());
        }
        LOGGER.info("Beginning segment creation phase on partitions: {}", map.keySet());
        ArrayList arrayList = new ArrayList();
        TableConfig tableConfig = this._segmentProcessorConfig.getTableConfig();
        Schema schema = this._segmentProcessorConfig.getSchema();
        String segmentNamePrefix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix();
        String segmentNamePostfix = this._segmentProcessorConfig.getSegmentConfig().getSegmentNamePostfix();
        String fixedSegmentName = this._segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
        SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
        segmentGeneratorConfig.setOutDir(this._segmentsOutputDir.getPath());
        if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
            segmentGeneratorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName, false));
        } else {
            segmentGeneratorConfig.setSegmentNamePrefix(segmentNamePrefix);
            segmentGeneratorConfig.setSegmentNamePostfix(segmentNamePostfix);
        }
        int maxNumRecordsPerSegment = this._segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
        int i = 0;
        for (Map.Entry<String, GenericRowFileManager> entry2 : map.entrySet()) {
            String key = entry2.getKey();
            GenericRowFileManager value = entry2.getValue();
            GenericRowFileReader fileReader = value.getFileReader();
            int numRows = fileReader.getNumRows();
            LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", new Object[]{key, Integer.valueOf(numRows), Integer.valueOf(fileReader.getNumSortFields())});
            GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
            int i2 = 0;
            while (i2 < numRows) {
                int min = Math.min(i2 + maxNumRecordsPerSegment, numRows);
                LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(min)});
                segmentGeneratorConfig.setSequenceId(i);
                GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(i2, min);
                SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
                segmentIndexCreationDriverImpl.init(segmentGeneratorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), TransformPipeline.getPassThroughPipeline());
                segmentIndexCreationDriverImpl.build();
                arrayList.add(segmentIndexCreationDriverImpl.getOutputDirectory());
                i2 += maxNumRecordsPerSegment;
                i++;
            }
            value.cleanUp();
        }
        FileUtils.deleteDirectory(this._mapperOutputDir);
        FileUtils.deleteDirectory(this._reducerOutputDir);
        LOGGER.info("Successfully created segments: {}", arrayList);
        return arrayList;
    }
}
