package org.apache.pinot.plugin.minion.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.class */
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
    private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";
    private static final int DEFUALT_PUSH_ATTEMPTS = 5;
    private static final int DEFAULT_PUSH_PARALLELISM = 1;
    private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000;
    protected MinionConf _minionConf;
    protected PinotTaskConfig _pinotTaskConfig;
    protected MinionEventObserver _eventObserver;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pinot$spi$ingestion$batch$BatchConfigProperties$SegmentPushType = new int[BatchConfigProperties.SegmentPushType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pinot$spi$ingestion$batch$BatchConfigProperties$SegmentPushType[BatchConfigProperties.SegmentPushType.TAR.ordinal()] = BaseMultipleSegmentsConversionExecutor.DEFAULT_PUSH_PARALLELISM;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pinot$spi$ingestion$batch$BatchConfigProperties$SegmentPushType[BatchConfigProperties.SegmentPushType.METADATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor$SegmentUploadContext.class */
    public static class SegmentUploadContext {
        private final PinotTaskConfig _pinotTaskConfig;
        private final List<SegmentConversionResult> _segmentConversionResults;
        private final String _tableNameWithType;
        private final String _uploadURL;
        private final AuthProvider _authProvider;
        private final String _inputSegmentNames;
        private final boolean _replaceSegmentsEnabled;
        private final Map<String, Object> _customMap;

        public SegmentUploadContext(PinotTaskConfig pinotTaskConfig, List<SegmentConversionResult> list) {
            this._pinotTaskConfig = pinotTaskConfig;
            this._segmentConversionResults = list;
            Map configs = pinotTaskConfig.getConfigs();
            this._tableNameWithType = (String) configs.get("tableName");
            this._uploadURL = (String) configs.get("uploadURL");
            this._authProvider = AuthProviderUtils.makeAuthProvider((String) configs.get("authToken"));
            this._inputSegmentNames = (String) configs.get("segmentName");
            this._replaceSegmentsEnabled = Boolean.parseBoolean((String) configs.get("enableReplaceSegments"));
            this._customMap = new HashMap();
        }

        public PinotTaskConfig getPinotTaskConfig() {
            return this._pinotTaskConfig;
        }

        public List<SegmentConversionResult> getSegmentConversionResults() {
            return this._segmentConversionResults;
        }

        public String getTableNameWithType() {
            return this._tableNameWithType;
        }

        public String getUploadURL() {
            return this._uploadURL;
        }

        public AuthProvider getAuthProvider() {
            return this._authProvider;
        }

        public String getInputSegmentNames() {
            return this._inputSegmentNames;
        }

        public boolean isReplaceSegmentsEnabled() {
            return this._replaceSegmentsEnabled;
        }

        public Object getCustomContext(String str) {
            return this._customMap.get(str);
        }

        public void setCustomContext(String str, Object obj) {
            this._customMap.put(str, obj);
        }
    }

    public BaseMultipleSegmentsConversionExecutor(MinionConf minionConf) {
        this._minionConf = minionConf;
    }

    protected abstract List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> list, File file) throws Exception;

    protected void preProcess(PinotTaskConfig pinotTaskConfig) throws Exception {
        Map configs = pinotTaskConfig.getConfigs();
        String str = (String) configs.get("tableName");
        String str2 = (String) configs.get("segmentName");
        Set<String> segmentNamesForTable = SegmentConversionUtils.getSegmentNamesForTable(str, FileUploadDownloadClient.extractBaseURI(new URI((String) configs.get("uploadURL"))), AuthProviderUtils.makeAuthProvider((String) configs.get("authToken")));
        HashSet hashSet = new HashSet(Arrays.asList(str2.split(",")));
        hashSet.removeAll(segmentNamesForTable);
        if (!CollectionUtils.isEmpty(hashSet)) {
            throw new RuntimeException(String.format("table: %s does have the following segments to process: %s", str, hashSet));
        }
    }

    protected void postProcess(PinotTaskConfig pinotTaskConfig) throws Exception {
    }

    protected void preUploadSegments(SegmentUploadContext segmentUploadContext) throws Exception {
        this._eventObserver.notifyProgress(this._pinotTaskConfig, "Prepare to upload segments: " + segmentUploadContext.getSegmentConversionResults().size());
        if (segmentUploadContext.isReplaceSegmentsEnabled()) {
            segmentUploadContext.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, SegmentConversionUtils.startSegmentReplace(segmentUploadContext.getTableNameWithType(), segmentUploadContext.getUploadURL(), new StartReplaceSegmentsRequest((List) Arrays.stream(StringUtils.split(segmentUploadContext.getInputSegmentNames(), ",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.toList()), (List) segmentUploadContext.getSegmentConversionResults().stream().map((v0) -> {
                return v0.getSegmentName();
            }).collect(Collectors.toList())), segmentUploadContext.getAuthProvider()));
        }
    }

    protected void postUploadSegments(SegmentUploadContext segmentUploadContext) throws Exception {
        this._eventObserver.notifyProgress(this._pinotTaskConfig, "Finishing uploading segments: " + segmentUploadContext.getSegmentConversionResults().size());
        if (segmentUploadContext.isReplaceSegmentsEnabled()) {
            SegmentConversionUtils.endSegmentReplace(segmentUploadContext.getTableNameWithType(), segmentUploadContext.getUploadURL(), (String) segmentUploadContext.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID), this._minionConf.getEndReplaceSegmentsTimeoutMs(), segmentUploadContext.getAuthProvider());
        }
    }

    @VisibleForTesting
    public void setMinionEventObserver(MinionEventObserver minionEventObserver) {
        this._eventObserver = minionEventObserver;
    }

    /* renamed from: executeTask, reason: merged with bridge method [inline-methods] */
    public List<SegmentConversionResult> m1executeTask(PinotTaskConfig pinotTaskConfig) throws Exception {
        URI uri;
        preProcess(pinotTaskConfig);
        this._pinotTaskConfig = pinotTaskConfig;
        this._eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
        String taskType = pinotTaskConfig.getTaskType();
        Map<String, String> configs = pinotTaskConfig.getConfigs();
        String str = configs.get("tableName");
        String str2 = configs.get("segmentName");
        String str3 = configs.get("uploadURL");
        String str4 = configs.get("downloadURL");
        String[] split = str4.split(",");
        AuthProvider makeAuthProvider = AuthProviderUtils.makeAuthProvider(configs.get("authToken"));
        LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", new Object[]{taskType, str, str2, str4, str3});
        File file = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
        Preconditions.checkState(file.mkdirs());
        String crypterClassName = getTableConfig(str).getValidationConfig().getCrypterClassName();
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < split.length; i += DEFAULT_PUSH_PARALLELISM) {
                this._eventObserver.notifyProgress(this._pinotTaskConfig, String.format("Downloading segment from: %s (%d out of %d)", split[i], Integer.valueOf(i + DEFAULT_PUSH_PARALLELISM), Integer.valueOf(split.length)));
                File file2 = new File(file, "tarredSegmentFile_" + i);
                LOGGER.info("Downloading segment from {} to {}", split[i], file2.getAbsolutePath());
                SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(split[i], file2, crypterClassName);
                this._eventObserver.notifyProgress(this._pinotTaskConfig, String.format("Decompressing segment from: %s (%d out of %d)", split[i], Integer.valueOf(i + DEFAULT_PUSH_PARALLELISM), Integer.valueOf(split.length)));
                arrayList.add((File) TarGzCompressionUtils.untar(file2, new File(file, "segmentDir_" + i)).get(0));
                if (!FileUtils.deleteQuietly(file2)) {
                    LOGGER.warn("Failed to delete tarred input segment: {}", file2.getAbsolutePath());
                }
            }
            File file3 = new File(file, "workingDir");
            Preconditions.checkState(file3.mkdir());
            List<SegmentConversionResult> convert = convert(pinotTaskConfig, arrayList, file3);
            File file4 = new File(file, "convertedTarredSegmentDir");
            Preconditions.checkState(file4.mkdir());
            int size = convert.size();
            ArrayList arrayList2 = new ArrayList(size);
            int i2 = DEFAULT_PUSH_PARALLELISM;
            for (SegmentConversionResult segmentConversionResult : convert) {
                MinionEventObserver minionEventObserver = this._eventObserver;
                PinotTaskConfig pinotTaskConfig2 = this._pinotTaskConfig;
                int i3 = i2;
                i2 += DEFAULT_PUSH_PARALLELISM;
                minionEventObserver.notifyProgress(pinotTaskConfig2, String.format("Compressing segment: %s (%d out of %d)", segmentConversionResult.getSegmentName(), Integer.valueOf(i3), Integer.valueOf(size)));
                File file5 = segmentConversionResult.getFile();
                File file6 = new File(file4, segmentConversionResult.getSegmentName() + ".tar.gz");
                TarGzCompressionUtils.createTarGzFile(file5, file6);
                arrayList2.add(file6);
                if (!FileUtils.deleteQuietly(file5)) {
                    LOGGER.warn("Failed to delete converted segment: {}", file5.getAbsolutePath());
                }
            }
            for (File file7 : arrayList) {
                if (file7.exists() && !FileUtils.deleteQuietly(file7)) {
                    LOGGER.warn("Failed to delete input segment: {}", file7.getAbsolutePath());
                }
            }
            if (this._cancelled) {
                LOGGER.info("{} on table: {}, segments: {} got cancelled", new Object[]{taskType, str, str2});
                throw new TaskCancelledException(taskType + " on table: " + str + ", segments: " + str2 + " got cancelled");
            }
            SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, convert);
            preUploadSegments(segmentUploadContext);
            for (int i4 = 0; i4 < size; i4 += DEFAULT_PUSH_PARALLELISM) {
                File file8 = (File) arrayList2.get(i4);
                SegmentConversionResult segmentConversionResult2 = convert.get(i4);
                this._eventObserver.notifyProgress(this._pinotTaskConfig, String.format("Uploading segment: %s (%d out of %d)", segmentConversionResult2.getSegmentName(), Integer.valueOf(i4 + DEFAULT_PUSH_PARALLELISM), Integer.valueOf(size)));
                BasicHeader basicHeader = new BasicHeader("Pinot-SegmentZKMetadataCustomMapModifier", getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult2).toJsonString());
                if (BatchConfigProperties.SegmentPushType.valueOf(configs.getOrDefault("push.mode", BatchConfigProperties.SegmentPushType.TAR.name()).toUpperCase()) != BatchConfigProperties.SegmentPushType.TAR) {
                    uri = moveSegmentToOutputPinotFS(configs, file8);
                    LOGGER.info("Moved generated segment from [{}] to location: [{}]", file8, uri);
                } else {
                    uri = file8.toURI();
                }
                ArrayList arrayList3 = new ArrayList();
                arrayList3.add(basicHeader);
                arrayList3.addAll(AuthProviderUtils.toRequestHeaders(makeAuthProvider));
                BasicNameValuePair basicNameValuePair = new BasicNameValuePair("enableParallelPushProtection", "true");
                BasicNameValuePair basicNameValuePair2 = new BasicNameValuePair("tableName", TableNameBuilder.extractRawTableName(str));
                BasicNameValuePair basicNameValuePair3 = new BasicNameValuePair("tableType", TableNameBuilder.getTableTypeFromTableName(str).toString());
                if ("RealtimeToOfflineSegmentsTask".equals(taskType)) {
                    basicNameValuePair3 = new BasicNameValuePair("tableType", TableType.OFFLINE.toString());
                }
                pushSegment(basicNameValuePair2.getValue(), configs, uri, arrayList3, Arrays.asList(basicNameValuePair, basicNameValuePair2, basicNameValuePair3), segmentConversionResult2);
                if (!FileUtils.deleteQuietly(file8)) {
                    LOGGER.warn("Failed to delete tarred converted segment: {}", file8.getAbsolutePath());
                }
            }
            postUploadSegments(segmentUploadContext);
            String str5 = (String) convert.stream().map((v0) -> {
                return v0.getSegmentName();
            }).collect(Collectors.joining(","));
            postProcess(pinotTaskConfig);
            LOGGER.info("Done executing {} on table: {}, input segments: {}, output segments: {}", new Object[]{taskType, str, str2, str5});
            FileUtils.deleteQuietly(file);
            return convert;
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    private void pushSegment(String str, Map<String, String> map, URI uri, List<Header> list, List<NameValuePair> list2, SegmentConversionResult segmentConversionResult) throws Exception {
        String orDefault = map.getOrDefault("push.mode", BatchConfigProperties.SegmentPushType.TAR.name());
        LOGGER.info("Trying to push Pinot segment with push mode {} from {}", orDefault, uri);
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
        pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
        pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
        pushJobSpec.setSegmentUriPrefix(map.get("push.segmentUriPrefix"));
        pushJobSpec.setSegmentUriSuffix(map.get("push.segmentUriSuffix"));
        SegmentGenerationJobSpec generatePushJobSpec = generatePushJobSpec(str, map, pushJobSpec);
        switch (AnonymousClass1.$SwitchMap$org$apache$pinot$spi$ingestion$batch$BatchConfigProperties$SegmentPushType[BatchConfigProperties.SegmentPushType.valueOf(orDefault.toUpperCase()).ordinal()]) {
            case DEFAULT_PUSH_PARALLELISM /* 1 */:
                SegmentConversionUtils.uploadSegment(map, list, list2, segmentConversionResult.getTableNameWithType(), segmentConversionResult.getSegmentName(), map.get("uploadURL"), new File(uri));
                return;
            case 2:
                if (!map.containsKey("output.segment.dir.uri")) {
                    throw new RuntimeException("Output dir URI missing for metadata push");
                }
                URI create = URI.create(map.get("output.segment.dir.uri"));
                PinotFS outputPinotFS = MinionTaskUtils.getOutputPinotFS(map, create);
                try {
                    SegmentPushUtils.sendSegmentUriAndMetadata(generatePushJobSpec, outputPinotFS, SegmentPushUtils.getSegmentUriToTarPathMap(create, pushJobSpec, new String[]{uri.toString()}), list, list2);
                    if (outputPinotFS != null) {
                        outputPinotFS.close();
                        return;
                    }
                    return;
                } catch (Throwable th) {
                    if (outputPinotFS != null) {
                        try {
                            outputPinotFS.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            default:
                throw new UnsupportedOperationException("Unrecognized push mode - " + orDefault);
        }
    }

    private SegmentGenerationJobSpec generatePushJobSpec(String str, Map<String, String> map, PushJobSpec pushJobSpec) {
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName(str);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(map.get("push.controllerUri"));
        PinotClusterSpec[] pinotClusterSpecArr = {pinotClusterSpec};
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        segmentGenerationJobSpec.setPinotClusterSpecs(pinotClusterSpecArr);
        segmentGenerationJobSpec.setAuthToken(map.get("authToken"));
        return segmentGenerationJobSpec;
    }

    private URI moveSegmentToOutputPinotFS(Map<String, String> map, File file) throws Exception {
        URI create = URI.create(map.get("output.segment.dir.uri"));
        PinotFS outputPinotFS = MinionTaskUtils.getOutputPinotFS(map, create);
        try {
            URI create2 = URI.create(MinionTaskUtils.normalizeDirectoryURI(create) + file.getName());
            if (!Boolean.parseBoolean(map.get("overwriteOutput")) && outputPinotFS.exists(create2)) {
                throw new RuntimeException(String.format("Output file: %s already exists. Set 'overwriteOutput' to true to ignore this error", create2));
            }
            outputPinotFS.copyFromLocalFile(file, create2);
            if (outputPinotFS != null) {
                outputPinotFS.close();
            }
            return create2;
        } catch (Throwable th) {
            if (outputPinotFS != null) {
                try {
                    outputPinotFS.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
