package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushResult;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationTaskSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.class */
public class SegmentGenerationAndPushTaskExecutor extends BaseTaskExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentGenerationAndPushTaskExecutor.class);
    private static final int DEFUALT_PUSH_ATTEMPTS = 5;
    private static final int DEFAULT_PUSH_PARALLELISM = 1;
    private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000;
    private PinotTaskConfig _pinotTaskConfig;
    private MinionEventObserver _eventObserver;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor
    public SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.pinot.minion.executor.PinotTaskExecutor
    public Object executeTask(PinotTaskConfig pinotTaskConfig) throws Exception {
        LOGGER.info("Executing SegmentGenerationAndPushTask with task config: {}", pinotTaskConfig);
        Map<String, String> configs = pinotTaskConfig.getConfigs();
        SegmentGenerationAndPushResult.Builder builder = new SegmentGenerationAndPushResult.Builder();
        File file = new File(new File(MinionContext.getInstance().getDataDir(), "SegmentGenerationAndPushResult"), "tmp-" + UUID.randomUUID());
        this._pinotTaskConfig = pinotTaskConfig;
        this._eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
        try {
            try {
                SegmentGenerationAndPushResult generateAndPushSegment = generateAndPushSegment(generateTaskSpec(configs, file), builder, configs);
                FileUtils.deleteQuietly(file);
                return generateAndPushSegment;
            } catch (Exception e) {
                throw new RuntimeException("Failed to execute SegmentGenerationAndPushTask", e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    private SegmentGenerationAndPushResult generateAndPushSegment(SegmentGenerationTaskSpec segmentGenerationTaskSpec, SegmentGenerationAndPushResult.Builder builder, Map<String, String> map) throws Exception {
        this._eventObserver.notifyProgress(this._pinotTaskConfig, "Generating segment");
        String run = new SegmentGenerationTaskRunner(segmentGenerationTaskSpec).run();
        this._eventObserver.notifyProgress(this._pinotTaskConfig, "Compressing segment: " + run);
        File tarSegmentDir = tarSegmentDir(segmentGenerationTaskSpec, run);
        this._eventObserver.notifyProgress(this._pinotTaskConfig, String.format("Moving segment: %s to output dir", run));
        URI moveSegmentToOutputPinotFS = moveSegmentToOutputPinotFS(map, tarSegmentDir);
        LOGGER.info("Moved generated segment from [{}] to location: [{}]", tarSegmentDir, moveSegmentToOutputPinotFS);
        builder.setSegmentName(run);
        this._eventObserver.notifyProgress(this._pinotTaskConfig, "Pushing segment: " + run);
        pushSegment(segmentGenerationTaskSpec.getTableConfig().getTableName(), map, moveSegmentToOutputPinotFS);
        builder.setSucceed(true);
        return builder.build();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x009a. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:15:0x0187  */
    /* JADX WARN: Removed duplicated region for block: B:17:0x01ae A[ORIG_RETURN, RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void pushSegment(java.lang.String r8, java.util.Map<java.lang.String, java.lang.String> r9, java.net.URI r10) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 431
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush.SegmentGenerationAndPushTaskExecutor.pushSegment(java.lang.String, java.util.Map, java.net.URI):void");
    }

    private SegmentGenerationJobSpec generatePushJobSpec(String str, Map<String, String> map, PushJobSpec pushJobSpec) {
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName(str);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(map.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
        PinotClusterSpec[] pinotClusterSpecArr = {pinotClusterSpec};
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        segmentGenerationJobSpec.setPinotClusterSpecs(pinotClusterSpecArr);
        segmentGenerationJobSpec.setAuthToken(map.get("authToken"));
        return segmentGenerationJobSpec;
    }

    private URI moveSegmentToOutputPinotFS(Map<String, String> map, File file) throws Exception {
        if (!map.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
            return file.toURI();
        }
        URI create = URI.create(map.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
        PinotFS outputPinotFS = MinionTaskUtils.getOutputPinotFS(map, create);
        try {
            URI create2 = URI.create(create + file.getName());
            if (Boolean.parseBoolean(map.get(BatchConfigProperties.OVERWRITE_OUTPUT)) || !outputPinotFS.exists(create)) {
                outputPinotFS.copyFromLocalFile(file, create2);
            } else {
                LOGGER.warn("Not overwrite existing output segment tar file: {}", Boolean.valueOf(outputPinotFS.exists(create)));
            }
            if (outputPinotFS != null) {
                outputPinotFS.close();
            }
            return create2;
        } catch (Throwable th) {
            if (outputPinotFS != null) {
                try {
                    outputPinotFS.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private File tarSegmentDir(SegmentGenerationTaskSpec segmentGenerationTaskSpec, String str) throws IOException {
        File file = new File(segmentGenerationTaskSpec.getOutputDirectoryPath());
        File file2 = new File(file, str);
        File file3 = new File(file, str + ".tar.gz");
        LOGGER.info("Tarring segment from: {} to: {}", file2, file3);
        TarGzCompressionUtils.createTarGzFile(file2, file3);
        LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", str, DataSizeUtils.fromBytes(FileUtils.sizeOf(file2)), DataSizeUtils.fromBytes(FileUtils.sizeOf(file3)));
        return file3;
    }

    protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> map, File file) throws Exception {
        SegmentGenerationTaskSpec segmentGenerationTaskSpec = new SegmentGenerationTaskSpec();
        URI create = URI.create(map.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
        PinotFS inputPinotFS = MinionTaskUtils.getInputPinotFS(map, create);
        try {
            File file2 = new File(file, "input");
            FileUtils.forceMkdir(file2);
            File file3 = new File(file, "output");
            FileUtils.forceMkdir(file3);
            segmentGenerationTaskSpec.setOutputDirectoryPath(file3.getAbsolutePath());
            this._eventObserver.notifyProgress(this._pinotTaskConfig, String.format("Copying file: %s to local disk", create));
            File file4 = new File(file2, new File(create.getPath()).getName());
            inputPinotFS.copyToLocalFile(create, file4);
            segmentGenerationTaskSpec.setInputFilePath(file4.getAbsolutePath());
            RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
            recordReaderSpec.setDataFormat(map.get(BatchConfigProperties.INPUT_FORMAT));
            recordReaderSpec.setClassName(map.get(BatchConfigProperties.RECORD_READER_CLASS));
            recordReaderSpec.setConfigClassName(map.get(BatchConfigProperties.RECORD_READER_CONFIG_CLASS));
            recordReaderSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(map, BatchConfigProperties.RECORD_READER_PROP_PREFIX));
            segmentGenerationTaskSpec.setRecordReaderSpec(recordReaderSpec);
            String str = map.get("authToken");
            String str2 = map.get("tableName");
            segmentGenerationTaskSpec.setSchema(map.containsKey(BatchConfigProperties.SCHEMA) ? (Schema) JsonUtils.stringToObject(JsonUtils.objectToString(map.get(BatchConfigProperties.SCHEMA)), Schema.class) : map.containsKey(BatchConfigProperties.SCHEMA_URI) ? SegmentGenerationUtils.getSchema(map.get(BatchConfigProperties.SCHEMA_URI), str) : getSchema(str2));
            segmentGenerationTaskSpec.setTableConfig(map.containsKey(BatchConfigProperties.TABLE_CONFIGS) ? (TableConfig) JsonUtils.stringToObject(map.get(BatchConfigProperties.TABLE_CONFIGS), TableConfig.class) : map.containsKey(BatchConfigProperties.TABLE_CONFIGS_URI) ? SegmentGenerationUtils.getTableConfig(map.get(BatchConfigProperties.TABLE_CONFIGS_URI), str) : getTableConfig(str2));
            segmentGenerationTaskSpec.setSequenceId(Integer.parseInt(map.get("sequenceId")));
            if (map.containsKey(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)) {
                segmentGenerationTaskSpec.setFailOnEmptySegment(Boolean.parseBoolean(map.get(BatchConfigProperties.FAIL_ON_EMPTY_SEGMENT)));
            }
            SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
            segmentNameGeneratorSpec.setType(map.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
            segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(map, BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
            segmentNameGeneratorSpec.addConfig("append.uuid.to.segment.name", map.getOrDefault("append.uuid.to.segment.name", Boolean.toString(false)));
            segmentGenerationTaskSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
            segmentGenerationTaskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, create.toString());
            if (inputPinotFS != null) {
                inputPinotFS.close();
            }
            return segmentGenerationTaskSpec;
        } catch (Throwable th) {
            if (inputPinotFS != null) {
                try {
                    inputPinotFS.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
