package org.apache.pinot.hadoop.job.mappers;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.index.IndexFileNames;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.hadoop.job.InternalConfigConstants;
import org.apache.pinot.ingestion.common.JobConfigConstants;
import org.apache.pinot.ingestion.jobs.SegmentCreationJob;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.class */
public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
    protected static final String LOCAL_TEMP_DIR = "pinot_hadoop_tmp";
    protected static final String PROGRESS_REPORTER_THREAD_NAME = "pinot-hadoop-progress-reporter";
    protected static final long PROGRESS_REPORTER_JOIN_WAIT_TIME_MS = 5000;
    protected Configuration _jobConf;
    protected String _rawTableName;
    protected Schema _schema;
    protected SegmentNameGenerator _segmentNameGenerator;
    protected TableConfig _tableConfig;
    protected String _recordReaderPath;
    protected Path _readerConfigFile;
    protected Path _hdfsSegmentTarDir;
    protected File _localStagingDir;
    protected File _localInputDir;
    protected File _localSegmentDir;
    protected File _localSegmentTarDir;
    protected final Logger _logger = LoggerFactory.getLogger(getClass());
    protected boolean _useRelativePath = false;
    protected boolean _failIfSchemaMismatch = false;
    private int _dataTypeMismatch = 0;
    private int _singleValueMultiValueFieldMismatch = 0;
    private int _multiValueStructureMismatch = 0;
    private int _missingPinotColumn = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper$ProgressReporter.class */
    public static class ProgressReporter implements Runnable {
        private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProgressReporter.class);
        private static final long PROGRESS_REPORTER_INTERVAL_MS = 60000;
        private final Mapper<LongWritable, Text, LongWritable, Text>.Context _context;

        ProgressReporter(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
            this._context = context;
        }

        @Override // java.lang.Runnable
        public void run() {
            LOGGER.info("Starting progress reporter thread: {}", Thread.currentThread());
            while (true) {
                try {
                    Thread.sleep(60000L);
                    LOGGER.info("============== Reporting progress ==============");
                    this._context.progress();
                } catch (InterruptedException e) {
                    LOGGER.info("Progress reporter thread: {} interrupted", Thread.currentThread());
                    return;
                }
            }
        }
    }

    protected static Path getRelativeOutputPath(URI uri, URI uri2, Path path) {
        URI relativize = uri.relativize(uri2);
        Preconditions.checkState(relativize.getPath().length() > 0 && !relativize.equals(uri2), "Unable to extract out the relative path based on base input path: " + uri);
        return new Path(path, relativize.getPath()).getParent();
    }

    public void setup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
        DateTimeFieldSpec specForTimeColumn;
        this._jobConf = context.getConfiguration();
        logConfigurations();
        this._useRelativePath = this._jobConf.getBoolean(JobConfigConstants.USE_RELATIVE_PATH, false);
        this._rawTableName = this._jobConf.get("segment.table.name");
        this._schema = Schema.fromString(this._jobConf.get(JobConfigConstants.SCHEMA));
        String str = this._jobConf.get(JobConfigConstants.TABLE_CONFIG);
        if (str != null) {
            this._tableConfig = (TableConfig) JsonUtils.stringToObject(str, TableConfig.class);
        }
        String str2 = this._jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
        if (str2 != null) {
            this._readerConfigFile = new Path(str2);
        }
        this._recordReaderPath = this._jobConf.get(JobConfigConstants.RECORD_READER_PATH);
        Preconditions.checkNotNull(this._tableConfig);
        setFlagForSchemaMismatch();
        String str3 = this._jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, "simple");
        boolean z = -1;
        switch (str3.hashCode()) {
            case -902286926:
                if (str3.equals("simple")) {
                    z = false;
                    break;
                }
                break;
            case -363228347:
                if (str3.equals(JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                this._segmentNameGenerator = new SimpleSegmentNameGenerator(this._rawTableName, this._jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
                break;
            case true:
                Preconditions.checkState(this._tableConfig != null, "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
                this._tableConfig.getValidationConfig();
                DateTimeFormatSpec dateTimeFormatSpec = null;
                String timeColumnName = this._tableConfig.getValidationConfig().getTimeColumnName();
                if (timeColumnName != null && (specForTimeColumn = this._schema.getSpecForTimeColumn(timeColumnName)) != null) {
                    dateTimeFormatSpec = specForTimeColumn.getFormatSpec();
                }
                this._segmentNameGenerator = new NormalizedDateSegmentNameGenerator(this._rawTableName, this._jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX), this._jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false), IngestionConfigUtils.getBatchSegmentIngestionType(this._tableConfig), IngestionConfigUtils.getBatchSegmentIngestionFrequency(this._tableConfig), dateTimeFormatSpec, this._jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
                break;
            default:
                throw new UnsupportedOperationException("Unsupported segment name generator type: " + str3);
        }
        this._hdfsSegmentTarDir = new Path(FileOutputFormat.getWorkOutputPath(context), JobConfigConstants.SEGMENT_TAR_DIR);
        this._localStagingDir = new File(LOCAL_TEMP_DIR);
        this._localInputDir = new File(this._localStagingDir, "inputData");
        this._localSegmentDir = new File(this._localStagingDir, IndexFileNames.SEGMENTS);
        this._localSegmentTarDir = new File(this._localStagingDir, JobConfigConstants.SEGMENT_TAR_DIR);
        if (this._localStagingDir.exists()) {
            this._logger.warn("Deleting existing file: {}", this._localStagingDir);
            FileUtils.forceDelete(this._localStagingDir);
        }
        this._logger.info("Making local temporary directories: {}, {}, {}", this._localStagingDir, this._localInputDir, this._localSegmentTarDir);
        Preconditions.checkState(this._localStagingDir.mkdirs());
        Preconditions.checkState(this._localInputDir.mkdir());
        Preconditions.checkState(this._localSegmentDir.mkdir());
        Preconditions.checkState(this._localSegmentTarDir.mkdir());
        this._logger.info("*********************************************************************");
        this._logger.info("Raw Table Name: {}", this._rawTableName);
        this._logger.info("Schema: {}", this._schema);
        this._logger.info("Segment Name Generator: {}", this._segmentNameGenerator);
        this._logger.info("Table Config: {}", this._tableConfig);
        this._logger.info("Reader Config File: {}", this._readerConfigFile);
        this._logger.info("*********************************************************************");
        this._logger.info("HDFS Segment Tar Directory: {}", this._hdfsSegmentTarDir);
        this._logger.info("Local Staging Directory: {}", this._localStagingDir);
        this._logger.info("Local Input Directory: {}", this._localInputDir);
        this._logger.info("Local Segment Tar Directory: {}", this._localSegmentTarDir);
        this._logger.info("*********************************************************************");
    }

    protected void logConfigurations() {
        StringBuilder sb = new StringBuilder();
        sb.append('{');
        boolean z = true;
        Iterator it2 = this._jobConf.iterator();
        while (it2.hasNext()) {
            Map.Entry entry = (Map.Entry) it2.next();
            if (z) {
                z = false;
            } else {
                sb.append(", ");
            }
            sb.append((String) entry.getKey());
            sb.append('=');
            sb.append((String) entry.getValue());
        }
        sb.append('}');
        this._logger.info("*********************************************************************");
        this._logger.info("Job Configurations: {}", sb.toString());
        this._logger.info("*********************************************************************");
    }

    protected void map(LongWritable longWritable, Text text, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
        String[] split = StringUtils.split(text.toString(), ' ');
        Preconditions.checkState(split.length == 2, "Illegal input value: {}", text);
        Path path = new Path(split[0]);
        int parseInt = Integer.parseInt(split[1]);
        this._logger.info("Generating segment with HDFS input file: {}, sequence id: {}", path, Integer.valueOf(parseInt));
        String name = path.getName();
        File file = new File(this._localInputDir, name);
        this._logger.info("Copying input file from: {} to: {}", path, file);
        FileSystem.get(path.toUri(), this._jobConf).copyToLocalFile(path, new Path(file.getAbsolutePath()));
        SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(this._tableConfig, this._schema);
        segmentGeneratorConfig.setTableName(this._rawTableName);
        segmentGeneratorConfig.setInputFilePath(file.getPath());
        segmentGeneratorConfig.setOutDir(this._localSegmentDir.getPath());
        segmentGeneratorConfig.setSegmentNameGenerator(this._segmentNameGenerator);
        segmentGeneratorConfig.setSequenceId(parseInt);
        if (this._recordReaderPath != null) {
            segmentGeneratorConfig.setRecordReaderPath(this._recordReaderPath);
            segmentGeneratorConfig.setFormat(FileFormat.OTHER);
        } else {
            FileFormat fileFormat = getFileFormat(name);
            segmentGeneratorConfig.setFormat(fileFormat);
            segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
        }
        segmentGeneratorConfig.setOnHeap(true);
        segmentGeneratorConfig.setFailOnEmptySegment(true);
        addAdditionalSegmentGeneratorConfigs(segmentGeneratorConfig, path, parseInt);
        this._logger.info("Start creating segment with sequence id: {}", Integer.valueOf(parseInt));
        SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
        Thread thread = new Thread(getProgressReporter(context));
        thread.setName(PROGRESS_REPORTER_THREAD_NAME);
        thread.start();
        try {
            try {
                segmentIndexCreationDriverImpl.init(segmentGeneratorConfig);
                validateSchema(segmentIndexCreationDriverImpl.getIngestionSchemaValidator());
                segmentIndexCreationDriverImpl.build();
                String segmentName = segmentIndexCreationDriverImpl.getSegmentName();
                this._logger.info("Finish creating segment: {} with sequence id: {}", segmentName, Integer.valueOf(parseInt));
                File file2 = new File(this._localSegmentDir, segmentName);
                String str = segmentName + ".tar.gz";
                File file3 = new File(this._localSegmentTarDir, str);
                this._logger.info("Tarring segment from: {} to: {}", file2, file3);
                TarGzCompressionUtils.createTarGzFile(file2, file3);
                this._logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, DataSizeUtils.fromBytes(FileUtils.sizeOf(file2)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file3)));
                Path path2 = new Path(this._hdfsSegmentTarDir, str);
                if (this._useRelativePath) {
                    path2 = new Path(getRelativeOutputPath(new Path(this._jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), path.toUri(), this._hdfsSegmentTarDir), str);
                }
                this._logger.info("Copying segment tar file from: {} to: {}", file3, path2);
                FileSystem.get(path2.toUri(), this._jobConf).copyFromLocalFile(true, true, new Path(file3.getAbsolutePath()), path2);
                context.write(new LongWritable(parseInt), new Text(str));
                this._logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, path, Integer.valueOf(parseInt));
                thread.interrupt();
                thread.join(5000L);
                if (thread.isAlive()) {
                    this._logger.error("Failed to interrupt progress reporter thread: {}", thread);
                }
            } catch (Exception e) {
                this._logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", path, Integer.valueOf(parseInt), e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            thread.interrupt();
            thread.join(5000L);
            if (thread.isAlive()) {
                this._logger.error("Failed to interrupt progress reporter thread: {}", thread);
            }
            throw th;
        }
    }

    protected FileFormat getFileFormat(String str) {
        if (str.endsWith(".avro")) {
            return FileFormat.AVRO;
        }
        if (str.endsWith(".csv")) {
            return FileFormat.CSV;
        }
        if (str.endsWith(".json")) {
            return FileFormat.JSON;
        }
        if (str.endsWith(".thrift")) {
            return FileFormat.THRIFT;
        }
        throw new IllegalArgumentException("Unsupported file format: {}" + str);
    }

    @Nullable
    protected RecordReaderConfig getReaderConfig(FileFormat fileFormat) throws IOException {
        FSDataInputStream open;
        if (this._readerConfigFile == null) {
            return null;
        }
        if (fileFormat == FileFormat.CSV) {
            open = FileSystem.get(this._readerConfigFile.toUri(), this._jobConf).open(this._readerConfigFile);
            try {
                CSVRecordReaderConfig cSVRecordReaderConfig = (CSVRecordReaderConfig) JsonUtils.inputStreamToObject(open, CSVRecordReaderConfig.class);
                this._logger.info("Using CSV record reader config: {}", cSVRecordReaderConfig);
                if (open != null) {
                    open.close();
                }
                return cSVRecordReaderConfig;
            } finally {
            }
        }
        if (fileFormat == FileFormat.THRIFT) {
            open = FileSystem.get(this._readerConfigFile.toUri(), this._jobConf).open(this._readerConfigFile);
            try {
                ThriftRecordReaderConfig thriftRecordReaderConfig = (ThriftRecordReaderConfig) JsonUtils.inputStreamToObject(open, ThriftRecordReaderConfig.class);
                this._logger.info("Using Thrift record reader config: {}", thriftRecordReaderConfig);
                if (open != null) {
                    open.close();
                }
                return thriftRecordReaderConfig;
            } finally {
            }
        }
        if (fileFormat != FileFormat.PROTO) {
            return null;
        }
        FSDataInputStream open2 = FileSystem.get(this._readerConfigFile.toUri(), this._jobConf).open(this._readerConfigFile);
        try {
            ProtoBufRecordReaderConfig protoBufRecordReaderConfig = (ProtoBufRecordReaderConfig) JsonUtils.inputStreamToObject(open2, ProtoBufRecordReaderConfig.class);
            this._logger.info("Using Protocol Buffer record reader config: {}", protoBufRecordReaderConfig);
            if (open2 != null) {
                open2.close();
            }
            return protoBufRecordReaderConfig;
        } finally {
            if (open2 != null) {
                try {
                    open2.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    protected Runnable getProgressReporter(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
        return new ProgressReporter(context);
    }

    protected void addAdditionalSegmentGeneratorConfigs(SegmentGeneratorConfig segmentGeneratorConfig, Path path, int i) {
    }

    private void setFlagForSchemaMismatch() {
        TableCustomConfig customConfig = this._tableConfig.getCustomConfig();
        if (customConfig == null) {
            this._logger.info("Set NOT to fail the job if schemas mismatch.");
            return;
        }
        Map customConfigs = customConfig.getCustomConfigs();
        if (customConfigs != null && customConfigs.containsKey(InternalConfigConstants.FAIL_ON_SCHEMA_MISMATCH)) {
            this._failIfSchemaMismatch = Boolean.parseBoolean((String) customConfigs.get(InternalConfigConstants.FAIL_ON_SCHEMA_MISMATCH));
        }
        this._logger.info(this._failIfSchemaMismatch ? "Set to fail the job if schemas mismatch." : "Set NOT to fail the job if schemas mismatch.");
    }

    private void validateSchema(IngestionSchemaValidator ingestionSchemaValidator) {
        if (ingestionSchemaValidator == null) {
            return;
        }
        if (ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()) {
            this._dataTypeMismatch++;
            this._logger.warn(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
        }
        if (ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
            this._singleValueMultiValueFieldMismatch++;
            ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
        }
        if (ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()) {
            this._multiValueStructureMismatch++;
            ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason();
        }
        if (ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()) {
            this._missingPinotColumn++;
            ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason();
        }
        if (isSchemaMismatch() && this._failIfSchemaMismatch) {
            throw new RuntimeException("Schema mismatch detected. Forcing to fail the job. Please checking log message above.");
        }
    }

    private boolean isSchemaMismatch() {
        return ((this._dataTypeMismatch + this._singleValueMultiValueFieldMismatch) + this._multiValueStructureMismatch) + this._missingPinotColumn != 0;
    }

    public void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context) {
        context.getCounter(SegmentCreationJob.SchemaMisMatchCounter.DATA_TYPE_MISMATCH).increment(this._dataTypeMismatch);
        context.getCounter(SegmentCreationJob.SchemaMisMatchCounter.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH).increment(this._singleValueMultiValueFieldMismatch);
        context.getCounter(SegmentCreationJob.SchemaMisMatchCounter.MULTI_VALUE_FIELD_STRUCTURE_MISMATCH).increment(this._multiValueStructureMismatch);
        context.getCounter(SegmentCreationJob.SchemaMisMatchCounter.MISSING_PINOT_COLUMN).increment(this._missingPinotColumn);
        this._logger.info("Deleting local temporary directory: {}", this._localStagingDir);
        FileUtils.deleteQuietly(this._localStagingDir);
    }

    protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
        map((LongWritable) obj, (Text) obj2, (Mapper<LongWritable, Text, LongWritable, Text>.Context) context);
    }
}
