package org.apache.pinot.plugin.ingestion.batch.standalone;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.class */
public class SegmentGenerationJobRunner implements IngestionJobRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentGenerationJobRunner.class);
    private SegmentGenerationJobSpec _spec;
    private ExecutorService _executorService;
    private PinotFS _inputDirFS;
    private PinotFS _outputDirFS;
    private URI _inputDirURI;
    private URI _outputDirURI;
    private CountDownLatch _segmentCreationTaskCountDownLatch;
    private Schema _schema;
    private TableConfig _tableConfig;
    private AtomicReference<Exception> _failure;

    public SegmentGenerationJobRunner() {
    }

    public SegmentGenerationJobRunner(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        init(segmentGenerationJobSpec);
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
        if (this._spec.getInputDirURI() == null) {
            throw new RuntimeException("Missing property 'inputDirURI' in 'jobSpec' file");
        }
        try {
            this._inputDirURI = SegmentGenerationUtils.getDirectoryURI(this._spec.getInputDirURI());
            if (this._spec.getOutputDirURI() == null) {
                throw new RuntimeException("Missing property 'outputDirURI' in 'jobSpec' file");
            }
            try {
                this._outputDirURI = SegmentGenerationUtils.getDirectoryURI(this._spec.getOutputDirURI());
                if (this._spec.getRecordReaderSpec() == null) {
                    throw new RuntimeException("Missing property 'recordReaderSpec' in 'jobSpec' file");
                }
                if (this._spec.getTableSpec() == null) {
                    throw new RuntimeException("Missing property 'tableSpec' in 'jobSpec' file");
                }
                if (this._spec.getTableSpec().getTableName() == null) {
                    throw new RuntimeException("Missing property 'tableName' in 'tableSpec'");
                }
                for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
                    PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
                }
                this._inputDirFS = PinotFSFactory.create(this._inputDirURI.getScheme());
                this._outputDirFS = PinotFSFactory.create(this._outputDirURI.getScheme());
                try {
                    if (!this._outputDirFS.exists(this._outputDirURI)) {
                        this._outputDirFS.mkdir(this._outputDirURI);
                    } else if (!this._outputDirFS.isDirectory(this._outputDirURI)) {
                        throw new RuntimeException(String.format("Output Directory URI: %s is not a directory", this._outputDirURI));
                    }
                    if (this._spec.getTableSpec().getSchemaURI() == null) {
                        if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                            throw new RuntimeException("Missing property 'schemaURI' in 'tableSpec'");
                        }
                        this._spec.getTableSpec().setSchemaURI(SegmentGenerationUtils.generateSchemaURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
                    }
                    this._schema = SegmentGenerationUtils.getSchema(this._spec.getTableSpec().getSchemaURI(), this._spec.getAuthToken());
                    if (this._spec.getTableSpec().getTableConfigURI() == null) {
                        if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                            throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
                        }
                        this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
                    }
                    this._tableConfig = SegmentGenerationUtils.getTableConfig(this._spec.getTableSpec().getTableConfigURI(), segmentGenerationJobSpec.getAuthToken());
                    int segmentCreationJobParallelism = this._spec.getSegmentCreationJobParallelism();
                    int numThreads = JobUtils.getNumThreads(segmentCreationJobParallelism);
                    LOGGER.info("Creating an executor service with {} threads(Job parallelism: {}, available cores: {}.)", Integer.valueOf(numThreads), Integer.valueOf(segmentCreationJobParallelism), Integer.valueOf(Runtime.getRuntime().availableProcessors()));
                    this._executorService = Executors.newFixedThreadPool(numThreads);
                    this._failure = new AtomicReference<>();
                } catch (IOException e) {
                    throw new RuntimeException("Failed to validate output 'outputDirURI': " + this._outputDirURI, e);
                }
            } catch (URISyntaxException e2) {
                throw new RuntimeException("Invalid property: 'outputDirURI'", e2);
            }
        } catch (URISyntaxException e3) {
            throw new RuntimeException("Invalid property: 'inputDirURI'", e3);
        }
    }

    public void run() throws Exception {
        List<String> listMatchedFilesWithRecursiveOption = SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(this._inputDirFS, this._inputDirURI, this._spec.getIncludeFileNamePattern(), this._spec.getExcludeFileNamePattern(), this._spec.isSearchRecursively());
        File file = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
        try {
            int size = listMatchedFilesWithRecursiveOption.size();
            this._segmentCreationTaskCountDownLatch = new CountDownLatch(size);
            if (SegmentGenerationJobUtils.useGlobalDirectorySequenceId(this._spec.getSegmentNameGeneratorSpec())) {
                for (int i = 0; i < size; i++) {
                    submitSegmentGenTask(file, SegmentGenerationUtils.getFileURI((String) listMatchedFilesWithRecursiveOption.get(i), this._inputDirURI), i);
                }
            } else {
                HashMap hashMap = new HashMap();
                for (String str : listMatchedFilesWithRecursiveOption) {
                    ((List) hashMap.computeIfAbsent(Paths.get(str, new String[0]).getParent().toString(), str2 -> {
                        return new ArrayList();
                    })).add(str);
                }
                for (String str3 : hashMap.keySet()) {
                    List list = (List) hashMap.get(str3);
                    Collections.sort(list);
                    for (int i2 = 0; i2 < list.size(); i2++) {
                        submitSegmentGenTask(file, SegmentGenerationUtils.getFileURI((String) list.get(i2), SegmentGenerationUtils.getDirectoryURI(str3)), i2);
                    }
                }
            }
            this._segmentCreationTaskCountDownLatch.await();
            if (this._failure.get() != null) {
                this._executorService.shutdownNow();
                throw this._failure.get();
            }
        } finally {
            FileUtils.deleteQuietly(file);
            this._executorService.shutdown();
        }
    }

    private void submitSegmentGenTask(File file, URI uri, int i) throws Exception {
        File file2 = new File(file, "input");
        FileUtils.forceMkdir(file2);
        File file3 = new File(file, "output");
        FileUtils.forceMkdir(file3);
        File createLocalInputDateFile = createLocalInputDateFile(uri, file2);
        this._inputDirFS.copyToLocalFile(uri, createLocalInputDateFile);
        SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
        segmentGenerationTaskSpec.setOutputDirectoryPath(file3.getAbsolutePath());
        segmentGenerationTaskSpec.setRecordReaderSpec(this._spec.getRecordReaderSpec());
        segmentGenerationTaskSpec.setSchema(this._schema);
        segmentGenerationTaskSpec.setTableConfig(this._tableConfig);
        segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(this._spec.getSegmentNameGeneratorSpec());
        segmentGenerationTaskSpec.setInputFilePath(createLocalInputDateFile.getAbsolutePath());
        segmentGenerationTaskSpec.setSequenceId(i);
        segmentGenerationTaskSpec.setFailOnEmptySegment(this._spec.isFailOnEmptySegment());
        segmentGenerationTaskSpec.setCustomProperty("input.data.file.uri", uri.toString());
        if (this._failure.get() != null) {
            LOGGER.info("Skipping Segment Generation Task for {} due to previous failures", uri);
        } else {
            LOGGER.info("Submitting one Segment Generation Task for {}", uri);
            this._executorService.submit(() -> {
                try {
                    try {
                        String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
                        File file4 = new File(file3, run);
                        String encode = URLEncoder.encode(run + ".tar.gz", "UTF-8");
                        File file5 = new File(file3, encode);
                        LOGGER.info("Tarring segment from: {} to: {}", file4, file5);
                        TarGzCompressionUtils.createTarGzFile(file4, file5);
                        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", run, DataSizeUtils.fromBytes(FileUtils.sizeOf(file4)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file5)));
                        URI resolve = SegmentGenerationUtils.getRelativeOutputPath(this._inputDirURI, uri, this._outputDirURI).resolve(encode);
                        if (this._spec.isOverwriteOutput() || !this._outputDirFS.exists(resolve)) {
                            this._outputDirFS.copyFromLocalFile(file5, resolve);
                        } else {
                            LOGGER.warn("Not overwrite existing output segment tar file: {}", Boolean.valueOf(this._outputDirFS.exists(resolve)));
                        }
                        this._segmentCreationTaskCountDownLatch.countDown();
                        FileUtils.deleteQuietly(file4);
                        FileUtils.deleteQuietly(file5);
                        FileUtils.deleteQuietly(createLocalInputDateFile);
                    } catch (Throwable th) {
                        this._failure.compareAndSet(null, new RuntimeException("Failed to generate Pinot segment for file - " + uri, th));
                        long count = this._segmentCreationTaskCountDownLatch.getCount();
                        for (int i2 = 0; i2 < count; i2++) {
                            this._segmentCreationTaskCountDownLatch.countDown();
                        }
                        this._segmentCreationTaskCountDownLatch.countDown();
                        FileUtils.deleteQuietly((File) null);
                        FileUtils.deleteQuietly((File) null);
                        FileUtils.deleteQuietly(createLocalInputDateFile);
                    }
                } catch (Throwable th2) {
                    this._segmentCreationTaskCountDownLatch.countDown();
                    FileUtils.deleteQuietly((File) null);
                    FileUtils.deleteQuietly((File) null);
                    FileUtils.deleteQuietly(createLocalInputDateFile);
                    throw th2;
                }
            });
        }
    }

    private File createLocalInputDateFile(URI uri, File file) {
        return new File(new File(file, UUID.randomUUID().toString()), new File(uri.getPath()).getName());
    }
}
