package org.apache.pinot.plugin.ingestion.batch.hadoop;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.class */
public class HadoopSegmentGenerationJobRunner extends Configured implements IngestionJobRunner, Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) HadoopSegmentGenerationJobRunner.class);
    public static final String SEGMENT_GENERATION_JOB_SPEC = "segmentGenerationJobSpec";
    private static final String DEPS_JAR_DIR_FIELD = "dependencyJarDir";
    private static final String STAGING_DIR_FIELD = "stagingDir";
    private static final String SEGMENT_TAR_SUBDIR_NAME = "segmentTar";
    private static final String DEPS_JAR_SUBDIR_NAME = "dependencyJars";
    private SegmentGenerationJobSpec _spec;

    public HadoopSegmentGenerationJobRunner() {
        setConf(new Configuration());
        getConf().set("mapreduce.job.user.classpath.first", "true");
    }

    public HadoopSegmentGenerationJobRunner(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this();
        init(segmentGenerationJobSpec);
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
        if (this._spec.getInputDirURI() == null) {
            throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getOutputDirURI() == null) {
            throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getRecordReaderSpec() == null) {
            throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec() == null) {
            throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec().getTableName() == null) {
            throw new RuntimeException("Missing property 'tableName' in 'tableSpec'");
        }
        if (this._spec.getTableSpec().getSchemaURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setSchemaURI(SegmentGenerationUtils.generateSchemaURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getTableSpec().getTableConfigURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
            this._spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap());
        }
    }

    public void run() throws Exception {
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
        }
        URI uri = new URI(this._spec.getInputDirURI());
        if (uri.getScheme() == null) {
            uri = new File(this._spec.getInputDirURI()).toURI();
        }
        List<String> listMatchedFilesWithRecursiveOption = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(PinotFSFactory.create(uri.getScheme()), uri, this._spec.getIncludeFileNamePattern(), this._spec.getExcludeFileNamePattern(), this._spec.isSearchRecursively());
        URI uri2 = new URI(this._spec.getOutputDirURI());
        if (uri2.getScheme() == null) {
            uri2 = new File(this._spec.getOutputDirURI()).toURI();
        }
        PinotFS create = PinotFSFactory.create(uri2.getScheme());
        create.mkdir(uri2);
        String str = (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR_FIELD);
        Preconditions.checkNotNull(str, "Please set config: stagingDir under 'executionFrameworkSpec.extraConfigs'");
        URI create2 = URI.create(str);
        if (create2.getScheme() == null) {
            create2 = new File(str).toURI();
        }
        if (!uri2.getScheme().equals(create2.getScheme())) {
            throw new RuntimeException(String.format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.", create2, uri2));
        }
        if (create.exists(create2)) {
            LOGGER.info("Clearing out existing staging directory: [{}]", create2);
            create.delete(create2, true);
        }
        create.mkdir(create2);
        Path path = new Path(create2.toString(), "input");
        create.mkdir(path.toUri());
        Path path2 = new Path(create2.toString(), SEGMENT_TAR_SUBDIR_NAME);
        create.mkdir(path2.toUri());
        int size = listMatchedFilesWithRecursiveOption.size();
        LOGGER.info("Creating segments with data files: {}", listMatchedFilesWithRecursiveOption);
        if (SegmentGenerationJobUtils.useGlobalDirectorySequenceId(this._spec.getSegmentNameGeneratorSpec())) {
            for (int i = 0; i < size; i++) {
                createInputFileUriAndSeqIdFile(SegmentGenerationUtils.getFileURI((String) listMatchedFilesWithRecursiveOption.get(i), uri), create, path, i);
            }
        } else {
            HashMap hashMap = new HashMap();
            for (String str2 : listMatchedFilesWithRecursiveOption) {
                java.nio.file.Path parent = Paths.get(str2, new String[0]).getParent();
                if (!hashMap.containsKey(parent.toString())) {
                    hashMap.put(parent.toString(), new ArrayList());
                }
                ((List) hashMap.get(parent.toString())).add(str2);
            }
            for (String str3 : hashMap.keySet()) {
                List list = (List) hashMap.get(str3);
                Collections.sort(list);
                for (int i2 = 0; i2 < list.size(); i2++) {
                    createInputFileUriAndSeqIdFile(SegmentGenerationUtils.getFileURI((String) list.get(i2), SegmentGenerationUtils.getDirectoryURI(str3)), create, path, i2);
                }
            }
        }
        try {
            Job job = Job.getInstance(getConf());
            job.setJobName(getClass().getSimpleName());
            job.setJarByClass(SegmentGenerationJobSpec.class);
            job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
            addMapperJarToDistributedCache(job, create, create2);
            Configuration configuration = job.getConfiguration();
            String str4 = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
            if (str4 != null) {
                configuration.set("mapreduce.job.credentials.binary", str4);
            }
            int segmentCreationJobParallelism = this._spec.getSegmentCreationJobParallelism();
            if (segmentCreationJobParallelism <= 0 || segmentCreationJobParallelism > size) {
                segmentCreationJobParallelism = size;
            }
            configuration.setInt("mapreduce.job.maps", segmentCreationJobParallelism);
            packPluginsToDistributedCache(job, create, create2);
            String str5 = (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR_FIELD);
            if (str5 != null) {
                addJarsToDistributedCache(job, new File(str5), create, new Path(create2.toString(), DEPS_JAR_SUBDIR_NAME).toUri(), false);
            }
            this._spec.setOutputDirURI(path2.toUri().toString());
            configuration.set(SEGMENT_GENERATION_JOB_SPEC, new Yaml().dump(this._spec));
            this._spec.setOutputDirURI(uri2.toString());
            job.setMapperClass(getMapperClass());
            job.setNumReduceTasks(0);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, path);
            FileOutputFormat.setOutputPath(job, new Path(str, "output"));
            job.waitForCompletion(true);
            if (!job.isSuccessful()) {
                throw new RuntimeException("Job failed: " + job);
            }
            LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", create2, uri2);
            moveFiles(create, new Path(str, SEGMENT_TAR_SUBDIR_NAME).toUri(), uri2, this._spec.isOverwriteOutput());
            LOGGER.info("Trying to clean up staging directory: [{}]", create2);
            create.delete(create2, true);
        } catch (Throwable th) {
            LOGGER.info("Trying to clean up staging directory: [{}]", create2);
            create.delete(create2, true);
            throw th;
        }
    }

    private void createInputFileUriAndSeqIdFile(URI uri, PinotFS pinotFS, Path path, int i) throws Exception {
        File createTempFile = File.createTempFile("pinot-filepath-", ".txt");
        DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(createTempFile));
        try {
            dataOutputStream.write((uri + " " + i).getBytes(StandardCharsets.UTF_8));
            dataOutputStream.flush();
            pinotFS.copyFromLocalFile(createTempFile, new Path(path, Integer.toString(i)).toUri());
            dataOutputStream.close();
        } catch (Throwable th) {
            try {
                dataOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void moveFiles(PinotFS pinotFS, URI uri, URI uri2, boolean z) throws IOException, URISyntaxException {
        for (String str : pinotFS.listFiles(uri, true)) {
            URI fileURI = SegmentGenerationUtils.getFileURI(str, uri);
            URI resolve = SegmentGenerationUtils.getRelativeOutputPath(uri, fileURI, uri2).resolve(SegmentGenerationUtils.getFileName(fileURI));
            if (z || !pinotFS.exists(resolve)) {
                pinotFS.move(fileURI, resolve, true);
            } else {
                LOGGER.warn("Can't overwrite existing output segment tar file: {}", resolve);
            }
        }
    }

    protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
        return HadoopSegmentCreationMapper.class;
    }

    protected void addMapperJarToDistributedCache(Job job, PinotFS pinotFS, URI uri) throws Exception {
        File file = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
        Path path = new Path(uri.toString(), file.getName());
        pinotFS.copyFromLocalDir(file, path.toUri());
        job.addFileToClassPath(path);
    }

    protected void packPluginsToDistributedCache(Job job, PinotFS pinotFS, URI uri) {
        String[] pluginsDirectories = PluginManager.get().getPluginsDirectories();
        if (pluginsDirectories == null) {
            LOGGER.warn("Plugin directories is null, nothing to pack to distributed cache");
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (String str : pluginsDirectories) {
            File file = new File(str);
            if (!file.exists()) {
                LOGGER.warn("Cannot find Pinot plugins directory at [{}]", str);
                return;
            }
            arrayList.add(file);
        }
        File file2 = new File("pinot-plugins.tar.gz");
        try {
            TarGzCompressionUtils.createTarGzFile((File[]) arrayList.toArray(new File[0]), file2);
            Path path = new Path(uri.toString(), "pinot-plugins.tar.gz");
            pinotFS.copyFromLocalFile(file2, path.toUri());
            job.addCacheFile(path.toUri());
            String property = System.getProperty("plugins.include");
            if (property != null) {
                job.getConfiguration().set("plugins.include", property);
            }
        } catch (Exception e) {
            LOGGER.error("Failed to tar plugins directories and upload to staging dir", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    protected void addJarsToDistributedCache(Job job, File file, PinotFS pinotFS, URI uri, boolean z) throws Exception {
        if (!file.exists()) {
            LOGGER.warn("No jars directory at [{}]", file);
            return;
        }
        Path path = new Path(uri);
        for (File file2 : FileUtils.listFiles(file, new String[]{"jar"}, z)) {
            LOGGER.info("Adding jar {} to distributed cache", file2);
            Path path2 = new Path(path, file2.getName());
            pinotFS.copyFromLocalFile(file2, path2.toUri());
            job.addFileToClassPath(path2);
        }
    }
}
