package org.apache.pinot.plugin.ingestion.batch.spark3;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.segment.local.utils.ConsistentDataPushUtils;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.runner.IngestionJobRunner;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.scheduler.JobFailed;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.class */
public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner, Serializable {
    private SegmentGenerationJobSpec _spec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner$ConsistentDataPushFailureHandler.class */
    public static class ConsistentDataPushFailureHandler extends SparkListener {
        private final SegmentGenerationJobSpec _spec;
        private final Map<URI, String> _uriToLineageEntryIdMap;

        public ConsistentDataPushFailureHandler(SegmentGenerationJobSpec segmentGenerationJobSpec, Map<URI, String> map) {
            this._spec = segmentGenerationJobSpec;
            this._uriToLineageEntryIdMap = map;
        }

        public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
            JobFailed jobResult = sparkListenerJobEnd.jobResult();
            if (jobResult instanceof JobFailed) {
                try {
                    ConsistentDataPushUtils.handleUploadException(this._spec, this._uriToLineageEntryIdMap, jobResult.exception());
                } catch (Exception e) {
                    throw new RuntimeException("Failed to handle upload exception", e);
                }
            }
        }
    }

    public void init(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        this._spec = segmentGenerationJobSpec;
    }

    public void run() throws Exception {
        setupFileSystems();
        URI validateOutputDirURI = validateOutputDirURI(this._spec.getOutputDirURI());
        PinotFS create = PinotFSFactory.create(validateOutputDirURI.getScheme());
        List<String> segmentsToPush = getSegmentsToPush(create, validateOutputDirURI);
        setupTableConfigURI();
        boolean consistentDataPushEnabled = ConsistentDataPushUtils.consistentDataPushEnabled(SegmentGenerationUtils.getTableConfig(this._spec.getTableSpec().getTableConfigURI(), this._spec.getAuthToken()));
        int pushParallelism = this._spec.getPushJobSpec().getPushParallelism();
        if (pushParallelism < 1) {
            pushParallelism = segmentsToPush.size();
        }
        if (consistentDataPushEnabled) {
            handleConsistentPush(segmentsToPush, validateOutputDirURI, pushParallelism);
        } else {
            handleNonConsistentPush(segmentsToPush, create, validateOutputDirURI, pushParallelism);
        }
    }

    private void handleConsistentPush(List<String> list, URI uri, int i) throws Exception {
        Map<String, String> segmentUriToTarPathMap = SegmentPushUtils.getSegmentUriToTarPathMap(uri, this._spec.getPushJobSpec(), (String[]) list.toArray(new String[0]));
        Map<URI, String> preUpload = ConsistentDataPushUtils.preUpload(this._spec, getSegmentsToReplace(segmentUriToTarPathMap));
        if (i == 1) {
            SegmentPushUtils.sendSegmentUriAndMetadata(this._spec, PinotFSFactory.create(uri.getScheme()), segmentUriToTarPathMap);
        } else {
            JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
            fromSparkContext.sc().addSparkListener(new ConsistentDataPushFailureHandler(this._spec, preUpload));
            JavaRDD parallelize = fromSparkContext.parallelize(list, i);
            if (this._spec.getPushJobSpec().isBatchSegmentUpload()) {
                parallelize.foreachPartition(it -> {
                    setupFileSystems();
                    ArrayList arrayList = new ArrayList();
                    Objects.requireNonNull(arrayList);
                    it.forEachRemaining((v1) -> {
                        r1.add(v1);
                    });
                    SegmentPushUtils.sendSegmentUriAndMetadata(this._spec, PinotFSFactory.create(uri.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri, this._spec.getPushJobSpec(), (String[]) arrayList.toArray(new String[0])));
                });
            } else {
                parallelize.foreach(str -> {
                    setupFileSystems();
                    SegmentPushUtils.sendSegmentUriAndMetadata(this._spec, PinotFSFactory.create(uri.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri, this._spec.getPushJobSpec(), new String[]{str}));
                });
            }
        }
        executePostUpload(preUpload);
    }

    private void handleNonConsistentPush(List<String> list, PinotFS pinotFS, final URI uri, int i) throws Exception {
        if (i == 1) {
            try {
                SegmentPushUtils.pushSegments(this._spec, pinotFS, list);
            } catch (RetriableOperationException | AttemptsExceededException e) {
                throw new RuntimeException((Throwable) e);
            }
        } else {
            JavaRDD parallelize = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate()).parallelize(list, i);
            if (this._spec.getPushJobSpec().isBatchSegmentUpload()) {
                parallelize.foreachPartition(new VoidFunction<Iterator<String>>() { // from class: org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner.1
                    public void call(Iterator<String> it) throws Exception {
                        PluginManager.get().init();
                        SparkSegmentMetadataPushJobRunner.this.setupFileSystems();
                        ArrayList arrayList = new ArrayList();
                        Objects.requireNonNull(arrayList);
                        it.forEachRemaining((v1) -> {
                            r1.add(v1);
                        });
                        try {
                            SegmentPushUtils.sendSegmentUriAndMetadata(SparkSegmentMetadataPushJobRunner.this._spec, PinotFSFactory.create(uri.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri, SparkSegmentMetadataPushJobRunner.this._spec.getPushJobSpec(), (String[]) arrayList.toArray(new String[0])));
                        } catch (RetriableOperationException | AttemptsExceededException e2) {
                            throw new RuntimeException((Throwable) e2);
                        }
                    }
                });
            } else {
                parallelize.foreach(new VoidFunction<String>() { // from class: org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner.2
                    public void call(String str) throws Exception {
                        PluginManager.get().init();
                        SparkSegmentMetadataPushJobRunner.this.setupFileSystems();
                        try {
                            SegmentPushUtils.sendSegmentUriAndMetadata(SparkSegmentMetadataPushJobRunner.this._spec, PinotFSFactory.create(uri.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri, SparkSegmentMetadataPushJobRunner.this._spec.getPushJobSpec(), new String[]{str}));
                        } catch (RetriableOperationException | AttemptsExceededException e2) {
                            throw new RuntimeException((Throwable) e2);
                        }
                    }
                });
            }
        }
    }

    private List<String> getSegmentsToPush(PinotFS pinotFS, URI uri) {
        return (List) Arrays.stream(listFiles(pinotFS, uri)).filter(str -> {
            return str.endsWith(".tar.gz");
        }).collect(Collectors.toList());
    }

    private void setupFileSystems() {
        for (PinotFSSpec pinotFSSpec : this._spec.getPinotFSSpecs()) {
            PinotFSFactory.register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
        }
    }

    private URI validateOutputDirURI(String str) {
        try {
            URI uri = new URI(str);
            if (uri.getScheme() == null) {
                uri = new File(str).toURI();
            }
            return uri;
        } catch (URISyntaxException e) {
            throw new RuntimeException("outputDirURI is not valid - '" + str + "'");
        }
    }

    private String[] listFiles(PinotFS pinotFS, URI uri) throws RuntimeException {
        try {
            return pinotFS.listFiles(uri, true);
        } catch (IOException e) {
            throw new RuntimeException("Unable to list all files under outputDirURI - '" + String.valueOf(uri) + "'");
        }
    }

    private void setupTableConfigURI() {
        if (this._spec.getTableSpec().getTableConfigURI() == null) {
            if (this._spec.getPinotClusterSpecs() == null || this._spec.getPinotClusterSpecs().length == 0) {
                throw new RuntimeException("Missing property 'tableConfigURI' in 'tableSpec'");
            }
            this._spec.getTableSpec().setTableConfigURI(SegmentGenerationUtils.generateTableConfigURI(this._spec.getPinotClusterSpecs()[0].getControllerURI(), this._spec.getTableSpec().getTableName()));
        }
    }

    private void executePostUpload(Map<URI, String> map) {
        try {
            ConsistentDataPushUtils.postUpload(this._spec, map);
        } catch (Exception e) {
            ConsistentDataPushUtils.handleUploadException(this._spec, map, e);
            throw new RuntimeException(e);
        }
    }

    public List<String> getSegmentsToReplace(Map<String, String> map) {
        Collection<String> values = map.values();
        ArrayList arrayList = new ArrayList(values.size());
        Iterator<String> it = values.iterator();
        while (it.hasNext()) {
            String name = new File(it.next()).getName();
            Preconditions.checkArgument(name.endsWith(".tar.gz"));
            arrayList.add(name.substring(0, name.length() - ".tar.gz".length()));
        }
        return arrayList;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1811363621:
                if (implMethodName.equals("lambda$handleConsistentPush$178969c4$1")) {
                    z = false;
                    break;
                }
                break;
            case -1811363620:
                if (implMethodName.equals("lambda$handleConsistentPush$178969c4$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/URI;Ljava/util/Iterator;)V")) {
                    SparkSegmentMetadataPushJobRunner sparkSegmentMetadataPushJobRunner = (SparkSegmentMetadataPushJobRunner) serializedLambda.getCapturedArg(0);
                    URI uri = (URI) serializedLambda.getCapturedArg(1);
                    return it -> {
                        setupFileSystems();
                        ArrayList arrayList = new ArrayList();
                        Objects.requireNonNull(arrayList);
                        it.forEachRemaining((v1) -> {
                            r1.add(v1);
                        });
                        SegmentPushUtils.sendSegmentUriAndMetadata(this._spec, PinotFSFactory.create(uri.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri, this._spec.getPushJobSpec(), (String[]) arrayList.toArray(new String[0])));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/VoidFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner") && serializedLambda.getImplMethodSignature().equals("(Ljava/net/URI;Ljava/lang/String;)V")) {
                    SparkSegmentMetadataPushJobRunner sparkSegmentMetadataPushJobRunner2 = (SparkSegmentMetadataPushJobRunner) serializedLambda.getCapturedArg(0);
                    URI uri2 = (URI) serializedLambda.getCapturedArg(1);
                    return str -> {
                        setupFileSystems();
                        SegmentPushUtils.sendSegmentUriAndMetadata(this._spec, PinotFSFactory.create(uri2.getScheme()), SegmentPushUtils.getSegmentUriToTarPathMap(uri2, this._spec.getPushJobSpec(), new String[]{str}));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
