package org.apache.pinot.segment.local.utils;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.class */
public class ConsistentDataPushUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentPushUtils.class);
    private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient();
    private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 10000, 2.0d);
    public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";

    private ConsistentDataPushUtils() {
    }

    public static Map<URI, String> preUpload(SegmentGenerationJobSpec segmentGenerationJobSpec, List<String> list) throws Exception {
        String tableName = segmentGenerationJobSpec.getTableSpec().getTableName();
        LOGGER.info("Start consistent push for table: " + tableName);
        Map<URI, List<String>> segmentsToReplace = getSegmentsToReplace(segmentGenerationJobSpec, tableName);
        LOGGER.info("Existing segments for table {}: " + segmentsToReplace, tableName);
        LOGGER.info("New segments for table: {}: " + list, tableName);
        return startReplaceSegments(segmentGenerationJobSpec, segmentsToReplace, list);
    }

    public static void postUpload(SegmentGenerationJobSpec segmentGenerationJobSpec, Map<URI, String> map) throws Exception {
        String tableName = segmentGenerationJobSpec.getTableSpec().getTableName();
        if (map == null || map.isEmpty()) {
            return;
        }
        LOGGER.info("End consistent push for table: " + tableName);
        endReplaceSegments(segmentGenerationJobSpec, map);
    }

    public static Map<URI, URI> getStartReplaceSegmentUris(SegmentGenerationJobSpec segmentGenerationJobSpec, String str) {
        HashMap hashMap = new HashMap();
        for (PinotClusterSpec pinotClusterSpec : segmentGenerationJobSpec.getPinotClusterSpecs()) {
            try {
                URI uri = new URI(pinotClusterSpec.getControllerURI());
                hashMap.put(uri, FileUploadDownloadClient.getStartReplaceSegmentsURI(uri, str, TableType.OFFLINE.toString(), true));
            } catch (URISyntaxException e) {
                throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
            }
        }
        return hashMap;
    }

    public static Map<URI, String> startReplaceSegments(SegmentGenerationJobSpec segmentGenerationJobSpec, Map<URI, List<String>> map, List<String> list) throws Exception {
        HashMap hashMap = new HashMap();
        String tableName = segmentGenerationJobSpec.getTableSpec().getTableName();
        Map<URI, URI> startReplaceSegmentUris = getStartReplaceSegmentUris(segmentGenerationJobSpec, tableName);
        AuthProvider makeAuthProvider = AuthProviderUtils.makeAuthProvider(segmentGenerationJobSpec.getAuthToken());
        LOGGER.info("Start replace segment URIs: " + startReplaceSegmentUris);
        for (Map.Entry<URI, URI> entry : startReplaceSegmentUris.entrySet()) {
            URI key = entry.getKey();
            URI value = entry.getValue();
            StartReplaceSegmentsRequest startReplaceSegmentsRequest = new StartReplaceSegmentsRequest(map.get(key), list);
            DEFAULT_RETRY_POLICY.attempt(() -> {
                try {
                    SimpleHttpResponse startReplaceSegments = FILE_UPLOAD_DOWNLOAD_CLIENT.startReplaceSegments(value, startReplaceSegmentsRequest, makeAuthProvider);
                    String response = startReplaceSegments.getResponse();
                    LOGGER.info("Got response {}: {} while sending start replace segment request for table: {}, uploadURI: {}, request: {}", Integer.valueOf(startReplaceSegments.getStatusCode()), response, tableName, value, startReplaceSegmentsRequest);
                    hashMap.put(key, JsonUtils.stringToJsonNode(response).get("segmentLineageEntryId").asText());
                    return true;
                } catch (SocketTimeoutException e) {
                    return false;
                } catch (HttpErrorStatusException e2) {
                    if (e2.getStatusCode() >= 500) {
                        return false;
                    }
                    if (e2.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
                        LOGGER.error("Table: {} not found when sending request: {}", tableName, value);
                    }
                    throw e2;
                }
            });
        }
        return hashMap;
    }

    public static void endReplaceSegments(SegmentGenerationJobSpec segmentGenerationJobSpec, Map<URI, String> map) throws Exception {
        AuthProvider makeAuthProvider = AuthProviderUtils.makeAuthProvider(segmentGenerationJobSpec.getAuthToken());
        String tableName = segmentGenerationJobSpec.getTableSpec().getTableName();
        for (URI uri : map.keySet()) {
            URI endReplaceSegmentsURI = FileUploadDownloadClient.getEndReplaceSegmentsURI(uri, tableName, TableType.OFFLINE.toString(), map.get(uri));
            DEFAULT_RETRY_POLICY.attempt(() -> {
                try {
                    SimpleHttpResponse endReplaceSegments = FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments(endReplaceSegmentsURI, 600000, makeAuthProvider);
                    LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURI: {}", Integer.valueOf(endReplaceSegments.getStatusCode()), endReplaceSegments.getResponse(), tableName, endReplaceSegmentsURI);
                    return true;
                } catch (SocketTimeoutException e) {
                    return false;
                } catch (HttpErrorStatusException e2) {
                    if (e2.getStatusCode() >= 500) {
                        return false;
                    }
                    throw e2;
                }
            });
        }
    }

    public static void handleUploadException(SegmentGenerationJobSpec segmentGenerationJobSpec, Map<URI, String> map, Exception exc) {
        if (map != null) {
            LOGGER.error("Exception when pushing segments. Marking segment lineage entry to 'REVERTED'.", (Throwable) exc);
            String tableName = segmentGenerationJobSpec.getTableSpec().getTableName();
            for (Map.Entry<URI, String> entry : map.entrySet()) {
                try {
                    SimpleHttpResponse revertReplaceSegments = FILE_UPLOAD_DOWNLOAD_CLIENT.revertReplaceSegments(FileUploadDownloadClient.getRevertReplaceSegmentsURI(entry.getKey(), tableName, TableType.OFFLINE.name(), entry.getValue(), true));
                    LOGGER.info("Got response {}: {} while sending revert replace segment request for table: {}, uploadURI: {}", Integer.valueOf(revertReplaceSegments.getStatusCode()), revertReplaceSegments.getResponse(), tableName, entry.getKey());
                } catch (IOException | URISyntaxException | HttpErrorStatusException e) {
                    LOGGER.error("Exception when sending revert replace segment request to controller: {} for table: {}", entry.getKey(), tableName, e);
                }
            }
        }
    }

    public static boolean consistentDataPushEnabled(TableConfig tableConfig) {
        boolean z = "REFRESH".equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && IngestionConfigUtils.getBatchSegmentIngestionConsistentDataPushEnabled(tableConfig);
        LOGGER.info("Consistent data push is: {}", z ? "enabled" : "disabled");
        return z;
    }

    public static Map<URI, List<String>> getSegmentsToReplace(SegmentGenerationJobSpec segmentGenerationJobSpec, String str) throws Exception {
        HashMap hashMap = new HashMap();
        for (PinotClusterSpec pinotClusterSpec : segmentGenerationJobSpec.getPinotClusterSpecs()) {
            try {
                URI uri = new URI(pinotClusterSpec.getControllerURI());
                hashMap.put(uri, FILE_UPLOAD_DOWNLOAD_CLIENT.getSegments(uri, str, TableType.OFFLINE, true).get(TableType.OFFLINE.toString()));
            } catch (URISyntaxException e) {
                throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'");
            }
        }
        return hashMap;
    }

    public static void configureSegmentPostfix(SegmentGenerationJobSpec segmentGenerationJobSpec) {
        SegmentNameGeneratorSpec segmentNameGeneratorSpec = segmentGenerationJobSpec.getSegmentNameGeneratorSpec();
        if (segmentNameGeneratorSpec == null) {
            segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
        }
        String str = segmentNameGeneratorSpec.getConfigs().get("segment.name.postfix");
        String l = Long.toString(System.currentTimeMillis());
        String join = str == null ? l : String.join("_", str, l);
        LOGGER.info("Since consistent data push is enabled, appending current timestamp: {} to segment name postfix", l);
        LOGGER.info("Segment postfix is now configured as: {}", join);
        segmentNameGeneratorSpec.addConfig("segment.name.postfix", join);
        segmentGenerationJobSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
    }
}
