package org.apache.pinot.tools.admin.command;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.lang.ProcessBuilder;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.GroovyTemplateUtils;
import org.apache.pinot.tools.Command;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.CommandLineArgs;
import picocli.CommandLine;

@CommandLine.Command(name = "LaunchSparkDataIngestionJob")
/* loaded from: input_file:org/apache/pinot/tools/admin/command/LaunchSparkDataIngestionJobCommand.class */
public class LaunchSparkDataIngestionJobCommand extends AbstractBaseAdminCommand implements Command {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) LaunchSparkDataIngestionJobCommand.class);
    public static final String MAIN_CLASS = "org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand";
    public static final String SPARK_HOME = "SPARK_HOME";
    public static final String BASEDIR = "basedir";
    public static final String LOCAL_FILE_PREFIX = "local://";
    public static final String PINOT_MAIN_JAR_PREFIX = "pinot-all";

    @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = {"Print this message."})
    private boolean _help = false;

    @CommandLine.Option(names = {"-jobSpecFile", "-jobSpec"}, required = true, description = {"Ingestion job spec file"})
    private String _jobSpecFile;

    @CommandLine.Option(names = {"-values"}, required = false, arity = "1..*", description = {"Context values set to the job spec template"})
    private List<String> _values;

    @CommandLine.Option(names = {"-propertyFile"}, required = false, description = {"A property file contains context values to set the job spec template"})
    private String _propertyFile;

    @CommandLine.Option(names = {"-pluginsToLoad"}, required = false, arity = "1..*", split = ":", description = {"List of plugin name separated by : to load at runtime. e.g. pinto-s3:pinot-parquet"})
    private List<String> _pluginsToLoad;

    @CommandLine.Option(names = {"-pluginsToExclude"}, defaultValue = "pinot-kafka-0.9:pinot-kafka-2.0", required = false, arity = "1..*", split = ":", description = {"List of plugin name separated by : to not load at runtime. e.g. pinto-s3:pinot-parquet"})
    private List<String> _pluginsToExclude;

    @CommandLine.Option(names = {"-pinotBaseDir"}, required = false, description = {"Pinot binary installation directory"})
    private String _pinotBaseDir;

    @CommandLine.Option(names = {"-deployMode"}, required = false, description = {"Spark Deploy Mode"})
    private String _deployMode;

    @CommandLine.Option(names = {"-master"}, required = false, defaultValue = "local", description = {"Spark Master"})
    private String _sparkMaster;

    @CommandLine.Option(names = {"-sparkVersion"}, required = false, defaultValue = "SPARK_3", description = {"Spark plugin to use - can be one of Spark_2 or Spark_3"})
    private SparkType _sparkVersion;

    @CommandLine.Option(names = {CommandLineArgs.VERBOSE}, required = false, defaultValue = "true", description = {"Enable verbose logging from launcher"})
    private boolean _verbose;

    @CommandLine.Option(names = {"-sparkConf"}, required = false, split = ":", mapFallbackValue = "", description = {"Additional Spark configuration values as key value pairs separated by : e.g. -sparkConf spark.executor.cores=2:num-executors=3"})
    private Map<String, String> _sparkConf;

    @CommandLine.Unmatched
    private String[] _unmatchedArgs;
    private AuthProvider _authProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pinot/tools/admin/command/LaunchSparkDataIngestionJobCommand$SparkType.class */
    public enum SparkType {
        SPARK_2("2.4.0", "pinot-batch-ingestion-spark-2.4", JavaVersion.JAVA_1_8),
        SPARK_3("3.2.1", "pinot-batch-ingestion-spark-3.2", JavaVersion.JAVA_11);

        private final String _sparkVersion;
        private final String _pluginName;
        private final JavaVersion _javaVersion;

        SparkType(String str, String str2, JavaVersion javaVersion) {
            this._sparkVersion = str;
            this._pluginName = str2;
            this._javaVersion = javaVersion;
        }

        public String getSparkVersion() {
            return this._sparkVersion;
        }

        public String getPluginName() {
            return this._pluginName;
        }

        public JavaVersion getJavaVersion() {
            return this._javaVersion;
        }
    }

    public String getJobSpecFile() {
        return this._jobSpecFile;
    }

    public void setJobSpecFile(String str) {
        this._jobSpecFile = str;
    }

    public List<String> getValues() {
        return this._values;
    }

    public void setValues(List<String> list) {
        this._values = list;
    }

    public String getPropertyFile() {
        return this._propertyFile;
    }

    public void setPropertyFile(String str) {
        this._propertyFile = str;
    }

    public void setAuthProvider(AuthProvider authProvider) {
        this._authProvider = authProvider;
    }

    @Override // org.apache.pinot.tools.Command
    public boolean getHelp() {
        return this._help;
    }

    public void setHelp(boolean z) {
        this._help = z;
    }

    @Override // org.apache.pinot.tools.Command
    public boolean execute() throws Exception {
        if (this._pinotBaseDir == null) {
            String property = System.getProperty(BASEDIR);
            if (property == null) {
                throw new RuntimeException(String.format("Either option -pinotBaseDir or env %s must be set. Currently null", BASEDIR));
            }
            this._pinotBaseDir = property;
        }
        Preconditions.checkNotNull(System.getenv(SPARK_HOME), "SPARK_HOME environment variable should be set to Spark installation path");
        if (!SystemUtils.isJavaVersionAtMost(this._sparkVersion.getJavaVersion())) {
            LOGGER.warn("Platform java version should be at most: {}, found: {}. Ignore this warning if you are running from different environment than your spark cluster", this._sparkVersion.getJavaVersion(), SystemUtils.JAVA_SPECIFICATION_VERSION);
        }
        SparkLauncher sparkLauncher = new SparkLauncher();
        sparkLauncher.setMaster(this._sparkMaster);
        if (this._deployMode != null) {
            sparkLauncher.setDeployMode(this._deployMode);
        }
        sparkLauncher.setMainClass(MAIN_CLASS);
        try {
            for (PinotFSSpec pinotFSSpec : IngestionJobLauncher.getSegmentGenerationJobSpec(this._jobSpecFile, this._propertyFile, GroovyTemplateUtils.getTemplateContext(this._values), System.getenv()).getPinotFSSpecs()) {
                PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
            }
            ArrayList arrayList = new ArrayList();
            addDepsJarToDistributedCache(sparkLauncher, this._pinotBaseDir, arrayList);
            addAppResource(sparkLauncher, this._pinotBaseDir, arrayList);
            String join = Joiner.on(":").join(arrayList);
            sparkLauncher.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, join);
            sparkLauncher.setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, join);
            if (isClusterDeployMode()) {
                sparkLauncher.addFile(this._jobSpecFile);
                sparkLauncher.addAppArgs("-jobSpecFile", FilenameUtils.getName(this._jobSpecFile));
            } else {
                sparkLauncher.addAppArgs("-jobSpecFile", this._jobSpecFile);
            }
            if (this._propertyFile != null) {
                sparkLauncher.addAppArgs("-propertyFile", this._propertyFile);
            }
            if (this._values != null) {
                sparkLauncher.addAppArgs("-values", Joiner.on(",").join(this._values));
            }
            if (this._sparkConf != null) {
                for (Map.Entry<String, String> entry : this._sparkConf.entrySet()) {
                    if (entry.getKey().startsWith("spark")) {
                        sparkLauncher.setConf(entry.getKey(), entry.getValue());
                    } else {
                        sparkLauncher.addSparkArg("--" + entry.getKey(), entry.getValue());
                    }
                }
            }
            if (this._unmatchedArgs != null) {
                sparkLauncher.addAppArgs(this._unmatchedArgs);
            }
            sparkLauncher.setAppName("Pinot Spark Ingestion Job");
            sparkLauncher.setVerbose(this._verbose);
            sparkLauncher.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            sparkLauncher.redirectError(ProcessBuilder.Redirect.INHERIT);
            sparkLauncher.launch().waitFor();
            return true;
        } catch (Exception e) {
            LOGGER.error("Got exception to generate IngestionJobSpec for data ingestion job - ", (Throwable) e);
            throw e;
        }
    }

    private void addAppResource(SparkLauncher sparkLauncher, String str, List<String> list) throws IOException {
        if (str != null) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = new File(str).toURI();
            }
            PinotFS create2 = PinotFSFactory.create(create.getScheme());
            for (String str2 : create2.listFiles(create, true)) {
                URI create3 = URI.create(str2);
                if (!create2.isDirectory(create3) && str2.endsWith(".jar") && str2.contains(PINOT_MAIN_JAR_PREFIX)) {
                    LOGGER.info("Adding jar: {} to appResource", str2);
                    String name = FilenameUtils.getName(str2);
                    if (isClusterDeployMode() || !isLocalFileUri(create3)) {
                        sparkLauncher.setAppResource("local://" + name);
                        sparkLauncher.addJar(str2);
                        list.add(name);
                    } else {
                        sparkLauncher.setAppResource("local://" + str2);
                        sparkLauncher.addJar(str2);
                        list.add(str2);
                    }
                }
            }
        }
    }

    private void addDepsJarToDistributedCache(SparkLauncher sparkLauncher, String str, List<String> list) throws IOException {
        if (str != null) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = new File(str).toURI();
            }
            PinotFS create2 = PinotFSFactory.create(create.getScheme());
            for (String str2 : create2.listFiles(create, true)) {
                if (!create2.isDirectory(URI.create(str2)) && str2.endsWith(".jar") && shouldLoadPlugin(FilenameUtils.getName(str2.substring(0, str2.lastIndexOf(47))))) {
                    addJarFilePath(sparkLauncher, list, str2);
                }
            }
        }
    }

    private boolean shouldLoadPlugin(String str) {
        return ((this._pluginsToLoad == null && !this._pluginsToExclude.contains(str) && !str.contains("spark")) || (this._pluginsToLoad != null && this._pluginsToLoad.contains(str))) || this._sparkVersion.getPluginName().contentEquals(str);
    }

    private void addJarFilePath(SparkLauncher sparkLauncher, List<String> list, String str) {
        URI create = URI.create(str);
        sparkLauncher.addJar(str);
        if (!isClusterDeployMode() && isLocalFileUri(create)) {
            LOGGER.info("Adding deps jar: {} to distributed cache", str);
            list.add(str);
            return;
        }
        LOGGER.info("Adding deps jar: {} to distributed cache", str);
        String name = FilenameUtils.getName(str);
        if (name.isEmpty()) {
            return;
        }
        list.add(name);
    }

    private boolean isLocalFileUri(URI uri) {
        return uri.getScheme() == null || uri.getScheme().contentEquals("file");
    }

    private boolean isClusterDeployMode() {
        return this._deployMode != null && this._deployMode.contentEquals("cluster");
    }

    @Override // org.apache.pinot.tools.AbstractBaseCommand
    public String getName() {
        return "LaunchSparkDataIngestionJob";
    }

    public String toString() {
        String str = "LaunchSparkDataIngestionJob -jobSpecFile " + this._jobSpecFile;
        if (this._propertyFile != null) {
            str = str + " -propertyFile " + this._propertyFile;
        }
        if (this._values != null) {
            str = str + " -values " + Arrays.toString(this._values.toArray());
        }
        return str;
    }

    @Override // org.apache.pinot.tools.Command
    public String description() {
        return "Launch a data ingestion job.";
    }

    public static void main(String[] strArr) {
        PluginManager.get().init();
        System.exit(new CommandLine(new LaunchSparkDataIngestionJobCommand()).execute(strArr));
    }
}
