package org.apache.pinot.plugin.ingestion.batch.spark3;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.CharEncoding;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.class */
public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Serializable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SparkSegmentGenerationJobRunner.class);
    private static final String DEPS_JAR_DIR = "dependencyJarDir";
    private static final String STAGING_DIR = "stagingDir";
    private SegmentGenerationJobSpec _spec;

    public SparkSegmentGenerationJobRunner() {
    }

    public SparkSegmentGenerationJobRunner(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        init(segmentGenerationJobSpec);
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
        if (this._spec.getInputDirURI() == null) {
            throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getOutputDirURI() == null) {
            throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file");
        }
        if (this._spec.getRecordReaderSpec() == null) {
            throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec() == null) {
            throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file");
        }
        if (this._spec.getTableSpec().getTableName() == null) {
            throw new RuntimeException("Missing property 'tableName' in 'tableSpec'");
        }
        if (this._spec.getTableSpec().getSchemaURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setSchemaURI(SegmentGenerationUtils.generateSchemaURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getTableSpec().getTableConfigURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
        if (this._spec.getExecutionFrameworkSpec().getExtraConfigs() == null) {
            this._spec.getExecutionFrameworkSpec().setExtraConfigs(new HashMap());
        }
    }

    /* JADX WARN: Finally extract failed */
    public void run() throws Exception {
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
        }
        URI uri = new URI(this._spec.getInputDirURI());
        if (uri.getScheme() == null) {
            uri = new File(this._spec.getInputDirURI()).toURI();
        }
        List<String> listMatchedFilesWithRecursiveOption = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(PinotFSFactory.create(uri.getScheme()), uri, this._spec.getIncludeFileNamePattern(), this._spec.getExcludeFileNamePattern(), this._spec.isSearchRecursively());
        LOGGER.info("Found {} files to create Pinot segments!", Integer.valueOf(listMatchedFilesWithRecursiveOption.size()));
        URI uri2 = new URI(this._spec.getOutputDirURI());
        if (uri2.getScheme() == null) {
            uri2 = new File(this._spec.getOutputDirURI()).toURI();
        }
        PinotFS create = PinotFSFactory.create(uri2.getScheme());
        create.mkdir(uri2);
        String str = (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
        URI uri3 = null;
        if (str != null) {
            uri3 = URI.create(str);
            if (uri3.getScheme() == null) {
                uri3 = new File(str).toURI();
            }
            if (!uri2.getScheme().equals(uri3.getScheme())) {
                throw new RuntimeException(String.format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.", uri3, uri2));
            }
            create.mkdir(uri3);
        }
        try {
            JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
            packPluginsToDistributedCache(fromSparkContext);
            if (this._spec.getExecutionFrameworkSpec().getExtraConfigs().containsKey(DEPS_JAR_DIR)) {
                addDepsJarToDistributedCache(fromSparkContext, (String) this._spec.getExecutionFrameworkSpec().getExtraConfigs().get(DEPS_JAR_DIR));
            }
            ArrayList arrayList = new ArrayList();
            if (SegmentGenerationJobUtils.useGlobalDirectorySequenceId(this._spec.getSegmentNameGeneratorSpec())) {
                for (int i = 0; i < listMatchedFilesWithRecursiveOption.size(); i++) {
                    arrayList.add(String.format("%s %d", listMatchedFilesWithRecursiveOption.get(i), Integer.valueOf(i)));
                }
            } else {
                HashMap hashMap = new HashMap();
                for (String str2 : listMatchedFilesWithRecursiveOption) {
                    Path parent = Paths.get(str2, new String[0]).getParent();
                    if (!hashMap.containsKey(parent.toString())) {
                        hashMap.put(parent.toString(), new ArrayList());
                    }
                    ((List) hashMap.get(parent.toString())).add(str2);
                }
                Iterator it = hashMap.keySet().iterator();
                while (it.hasNext()) {
                    List list = (List) hashMap.get((String) it.next());
                    Collections.sort(list);
                    for (int i2 = 0; i2 < list.size(); i2++) {
                        arrayList.add(String.format("%s %d", list.get(i2), Integer.valueOf(i2)));
                    }
                }
            }
            int size = arrayList.size();
            int segmentCreationJobParallelism = this._spec.getSegmentCreationJobParallelism();
            if (segmentCreationJobParallelism <= 0 || segmentCreationJobParallelism > size) {
                segmentCreationJobParallelism = size;
            }
            JavaRDD parallelize = fromSparkContext.parallelize(arrayList, segmentCreationJobParallelism);
            final String str3 = fromSparkContext.getConf().contains("plugins.include") ? fromSparkContext.getConf().get("plugins.include") : null;
            final URI uri4 = uri;
            final URI uri5 = uri3 == null ? uri2 : uri3;
            parallelize.foreach(new VoidFunction<String>() { // from class: org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner.1
                public void call(String str4) throws Exception {
                    PluginManager.get().init();
                    for (PinotFSSpec pinotFSSpec2 : SparkSegmentGenerationJobRunner.this._spec.getPinotFSSpecs()) {
                        PinotFSFactory.register(pinotFSSpec2.getScheme(), pinotFSSpec2.getClassName(), new PinotConfiguration(pinotFSSpec2));
                    }
                    PinotFS create2 = PinotFSFactory.create(uri5.getScheme());
                    String[] split = str4.split(" ");
                    String str5 = split[0];
                    int intValue = Integer.valueOf(split[1]).intValue();
                    File file = new File("pinot-plugins.tar.gz");
                    if (file.exists()) {
                        File file2 = new File("pinot-plugins-dir-" + intValue);
                        try {
                            TarGzCompressionUtils.untar(file, file2);
                            SparkSegmentGenerationJobRunner.LOGGER.info("Trying to set System Property: [{}={}]", "plugins.dir", file2.getAbsolutePath());
                            System.setProperty("plugins.dir", file2.getAbsolutePath());
                            if (str3 != null) {
                                SparkSegmentGenerationJobRunner.LOGGER.info("Trying to set System Property: [{}={}]", "plugins.include", str3);
                                System.setProperty("plugins.include", str3);
                            }
                            SparkSegmentGenerationJobRunner.LOGGER.info("Pinot plugins System Properties are set at [{}], plugins includes [{}]", System.getProperty("plugins.dir"), System.getProperty("plugins.include"));
                        } catch (Exception e) {
                            SparkSegmentGenerationJobRunner.LOGGER.error("Failed to untar local Pinot plugins tarball file [{}]", file, e);
                            throw new RuntimeException(e);
                        }
                    } else {
                        SparkSegmentGenerationJobRunner.LOGGER.warn("Cannot find local Pinot plugins tar file at [{}]", file.getAbsolutePath());
                    }
                    URI create3 = URI.create(str5);
                    if (create3.getScheme() == null) {
                        create3 = new URI(uri4.getScheme(), create3.getSchemeSpecificPart(), create3.getFragment());
                    }
                    File file3 = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
                    File file4 = new File(file3, "input");
                    FileUtils.forceMkdir(file4);
                    File file5 = new File(file3, "output");
                    FileUtils.forceMkdir(file5);
                    File file6 = new File(file4, SegmentGenerationUtils.getFileName(create3));
                    SparkSegmentGenerationJobRunner.LOGGER.info("Trying to copy input file from {} to {}", create3, file6);
                    PinotFSFactory.create(create3.getScheme()).copyToLocalFile(create3, file6);
                    SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
                    segmentGenerationTaskSpec.setInputFilePath(file6.getAbsolutePath());
                    segmentGenerationTaskSpec.setOutputDirectoryPath(file5.getAbsolutePath());
                    segmentGenerationTaskSpec.setRecordReaderSpec(SparkSegmentGenerationJobRunner.this._spec.getRecordReaderSpec());
                    segmentGenerationTaskSpec.setSchema(SegmentGenerationUtils.getSchema(SparkSegmentGenerationJobRunner.this._spec.getTableSpec().getSchemaURI(), SparkSegmentGenerationJobRunner.this._spec.getAuthToken()));
                    segmentGenerationTaskSpec.setTableConfig(SegmentGenerationUtils.getTableConfig(SparkSegmentGenerationJobRunner.this._spec.getTableSpec().getTableConfigURI(), SparkSegmentGenerationJobRunner.this._spec.getAuthToken()));
                    segmentGenerationTaskSpec.setSequenceId(intValue);
                    segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(SparkSegmentGenerationJobRunner.this._spec.getSegmentNameGeneratorSpec());
                    segmentGenerationTaskSpec.setFailOnEmptySegment(SparkSegmentGenerationJobRunner.this._spec.isFailOnEmptySegment());
                    segmentGenerationTaskSpec.setCustomProperty("input.data.file.uri", create3.toString());
                    String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
                    File file7 = new File(file5, run);
                    String encode = URLEncoder.encode(run + ".tar.gz", CharEncoding.UTF_8);
                    File file8 = new File(file5, encode);
                    SparkSegmentGenerationJobRunner.LOGGER.info("Tarring segment from: {} to: {}", file7, file8);
                    TarGzCompressionUtils.createTarGzFile(file7, file8);
                    SparkSegmentGenerationJobRunner.LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", run, DataSizeUtils.fromBytes(FileUtils.sizeOf(file7)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file8)));
                    URI relativeOutputPath = SegmentGenerationUtils.getRelativeOutputPath(uri4, create3, uri5);
                    SegmentGenerationJobUtils.moveLocalTarFileToRemote(file8, relativeOutputPath.resolve(encode), SparkSegmentGenerationJobRunner.this._spec.isOverwriteOutput());
                    String encode2 = URLEncoder.encode(run + ".metadata.tar.gz", CharEncoding.UTF_8);
                    URI resolve = relativeOutputPath.resolve(encode2);
                    if (create2.exists(resolve) && (SparkSegmentGenerationJobRunner.this._spec.isOverwriteOutput() || !SparkSegmentGenerationJobRunner.this._spec.isCreateMetadataTarGz())) {
                        SparkSegmentGenerationJobRunner.LOGGER.info("Deleting existing metadata tar gz file: {}", resolve);
                        create2.delete(resolve, true);
                    }
                    if (segmentGenerationTaskSpec.isCreateMetadataTarGz()) {
                        File file9 = new File(file5, encode2);
                        SegmentGenerationJobUtils.createSegmentMetadataTarGz(file7, file9);
                        SegmentGenerationJobUtils.moveLocalTarFileToRemote(file9, resolve, SparkSegmentGenerationJobRunner.this._spec.isOverwriteOutput());
                    }
                    FileUtils.deleteQuietly(file7);
                    FileUtils.deleteQuietly(file6);
                }
            });
            if (uri3 != null) {
                LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", uri3, uri2);
                create.copyDir(uri3, uri2);
            }
            if (uri3 != null) {
                LOGGER.info("Trying to clean up staging directory: [{}]", uri3);
                create.delete(uri3, true);
            }
        } catch (Throwable th) {
            if (uri3 != null) {
                LOGGER.info("Trying to clean up staging directory: [{}]", uri3);
                create.delete(uri3, true);
            }
            throw th;
        }
    }

    protected void addDepsJarToDistributedCache(JavaSparkContext javaSparkContext, String str) throws IOException {
        if (str != null) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = new File(str).toURI();
            }
            PinotFS create2 = PinotFSFactory.create(create.getScheme());
            for (String str2 : create2.listFiles(create, true)) {
                if (!create2.isDirectory(URI.create(str2)) && str2.endsWith(".jar")) {
                    LOGGER.info("Adding deps jar: {} to distributed cache", str2);
                    javaSparkContext.addJar(str2);
                }
            }
        }
    }

    protected void packPluginsToDistributedCache(JavaSparkContext javaSparkContext) {
        String[] pluginsDirectories = PluginManager.get().getPluginsDirectories();
        if (pluginsDirectories == null) {
            LOGGER.warn("Plugin directories is null, skipping packaging...");
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (String str : pluginsDirectories) {
            File file = new File(str);
            if (!file.exists()) {
                LOGGER.warn("Cannot find Pinot plugins directory at [{}]", str);
                return;
            }
            arrayList.add(file);
        }
        File file2 = new File("pinot-plugins.tar.gz");
        try {
            TarGzCompressionUtils.createTarGzFile((File[]) arrayList.toArray(new File[0]), file2);
        } catch (IOException e) {
            LOGGER.error("Failed to tar plugins directories", (Throwable) e);
        }
        javaSparkContext.addFile(file2.getAbsolutePath());
        String property = System.getProperty("plugins.include");
        if (property != null) {
            javaSparkContext.getConf().set("plugins.include", property);
        }
    }
}
