package org.apache.pinot.server.predownload;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.server.conf.ServerConf;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.crypt.PinotCrypterFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/server/predownload/PredownloadScheduler.class */
public class PredownloadScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class);
    private static final String TMP_DIR_NAME = "tmp";
    private static final String TMP_DIR_FORMAT = "tmp-%s-%s";
    private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60;
    private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5;
    private final PropertiesConfiguration _properties;
    private final PinotConfiguration _pinotConfig;
    private final InstanceDataManagerConfig _instanceDataManagerConfig;
    private final String _clusterName;
    private final String _instanceId;
    private final String _zkAddress;

    @VisibleForTesting
    Executor _executor;

    @VisibleForTesting
    Set<String> _failedSegments;
    private PredownloadMetrics _predownloadMetrics;
    private int _numOfSkippedSegments;
    private int _numOfUnableToDownloadSegments;
    private int _numOfDownloadSegments;
    private long _totalDownloadedSizeBytes;
    private PredownloadZKClient _predownloadZkClient;
    private List<PredownloadSegmentInfo> _predownloadSegmentInfoList;
    private Map<String, PredownloadTableInfo> _tableInfoMap;

    public PredownloadScheduler(PropertiesConfiguration propertiesConfiguration) throws Exception {
        this._properties = propertiesConfiguration;
        this._clusterName = propertiesConfiguration.getString("pinot.cluster.name");
        this._zkAddress = propertiesConfiguration.getString("pinot.zk.server");
        this._instanceId = propertiesConfiguration.getString("pinot.server.instance.id");
        this._pinotConfig = new PinotConfiguration(propertiesConfiguration);
        this._instanceDataManagerConfig = new HelixInstanceDataManagerConfig(new ServerConf(this._pinotConfig).getInstanceDataManagerConfig());
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this._failedSegments = ConcurrentHashMap.newKeySet();
        this._executor = Executors.newFixedThreadPool(availableProcessors * 3);
        LOGGER.info("Created thread pool with num of threads: {}", Integer.valueOf(availableProcessors * 3));
        this._numOfSkippedSegments = 0;
        this._numOfDownloadSegments = 0;
    }

    public void start() {
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.pinot.server.predownload.PredownloadScheduler.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    PredownloadScheduler.LOGGER.info("Trying to stop predownload process!");
                    stop();
                } catch (Exception e) {
                    PredownloadScheduler.LOGGER.error("error shutting down predownload process : ", e);
                }
            }
        });
        long currentTimeMillis = System.currentTimeMillis();
        initializeZK();
        initializeMetricsReporter();
        initializeSegmentFetcher();
        getSegmentsInfo();
        loadSegmentsFromLocal();
        PredownloadCompletionReason downloadSegments = downloadSegments();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        LOGGER.info("Predownload process took {} sec, tried to download {} segments, skipped {} segments and unable to download {} segments. Download size: {} MB. Download speed: {} MB/s", new Object[]{Long.valueOf(currentTimeMillis2 / 1000), Integer.valueOf(this._numOfDownloadSegments), Integer.valueOf(this._numOfSkippedSegments), Integer.valueOf(this._numOfUnableToDownloadSegments), Long.valueOf(this._totalDownloadedSizeBytes / 1048576), Long.valueOf((this._totalDownloadedSizeBytes / 1048576) / ((currentTimeMillis2 / 1000) + 1))});
        if (downloadSegments.isSucceed()) {
            this._predownloadMetrics.preDownloadSucceed(this._totalDownloadedSizeBytes, currentTimeMillis2);
        }
        PredownloadStatusRecorder.predownloadComplete(downloadSegments, this._clusterName, this._instanceId, String.join(",", this._failedSegments));
    }

    public void stop() {
        if (this._predownloadZkClient != null) {
            this._predownloadZkClient.close();
        }
        if (this._executor != null) {
            ((ThreadPoolExecutor) this._executor).shutdownNow();
        }
    }

    void initializeZK() {
        LOGGER.info("Initializing ZK client with address: {} and instanceId: {}", this._zkAddress, this._instanceId);
        this._predownloadZkClient = new PredownloadZKClient(this._zkAddress, this._clusterName, this._instanceId);
        this._predownloadZkClient.start();
    }

    void initializeMetricsReporter() {
        LOGGER.info("Initializing metrics reporter");
        this._predownloadMetrics = new PredownloadMetrics();
        PredownloadStatusRecorder.registerMetrics(this._predownloadMetrics);
    }

    @VisibleForTesting
    void getSegmentsInfo() {
        LOGGER.info("Getting segments info from ZK");
        this._predownloadSegmentInfoList = this._predownloadZkClient.getSegmentsOfInstance(this._predownloadZkClient.getDataAccessor());
        if (this._predownloadSegmentInfoList.isEmpty()) {
            PredownloadStatusRecorder.predownloadComplete(PredownloadCompletionReason.NO_SEGMENT_TO_PREDOWNLOAD, this._clusterName, this._instanceId, "");
        }
        this._tableInfoMap = new HashMap();
        this._predownloadZkClient.updateSegmentMetadata(this._predownloadSegmentInfoList, this._tableInfoMap, this._instanceDataManagerConfig);
    }

    @VisibleForTesting
    void loadSegmentsFromLocal() {
        LOGGER.info("Loading segments from local to reduce number of segments to download");
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (PredownloadSegmentInfo predownloadSegmentInfo : this._predownloadSegmentInfoList) {
            arrayList.add(CompletableFuture.runAsync(() -> {
                boolean z = false;
                try {
                    PredownloadTableInfo predownloadTableInfo = this._tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType());
                    if (predownloadTableInfo != null) {
                        z = predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo);
                    }
                } catch (Exception e) {
                    LOGGER.error("Failed to load from local for segment: {} of table: {} with issue ", new Object[]{predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType(), e});
                }
                if (z || !predownloadSegmentInfo.canBeDownloaded()) {
                    return;
                }
                this._failedSegments.add(predownloadSegmentInfo.getSegmentName());
            }, this._executor));
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get(LOAD_SEGMENTS_TIMEOUT_MIN, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            LOGGER.error("Task interrupted", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOGGER.error("Task encountered an exception", e2.getCause());
        } catch (TimeoutException e3) {
            LOGGER.error("Task timed out", e3);
        }
        LOGGER.info("Load segments from local took {} sec", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
    }

    @VisibleForTesting
    void initializeSegmentFetcher() {
        LOGGER.info("Initializing segment fetchers");
        PinotConfiguration subset = this._pinotConfig.subset("pinot.server.segment.fetcher");
        PinotConfiguration subset2 = this._pinotConfig.subset("pinot.server.storage.factory");
        PinotConfiguration subset3 = this._pinotConfig.subset("pinot.server.crypter");
        try {
            SegmentFetcherFactory.init(subset);
            PinotFSFactory.init(subset2);
            PinotCrypterFactory.init(subset3);
        } catch (Exception e) {
            LOGGER.error("Failed to initialize segment fetcher factory: {}", e);
            PredownloadStatusRecorder.predownloadComplete(PredownloadCompletionReason.CANNOT_CONNECT_TO_DEEPSTORE, this._clusterName, this._instanceId, "");
        }
    }

    public PredownloadCompletionReason downloadSegments() {
        LOGGER.info("Downloading segments from deep store");
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (PredownloadSegmentInfo predownloadSegmentInfo : this._predownloadSegmentInfoList) {
            if (predownloadSegmentInfo.isDownloaded()) {
                this._numOfSkippedSegments++;
            } else if (predownloadSegmentInfo.canBeDownloaded()) {
                this._numOfDownloadSegments++;
                arrayList.add(CompletableFuture.runAsync(() -> {
                    try {
                        downloadSegment(predownloadSegmentInfo);
                    } catch (Exception e) {
                        LOGGER.error("Failed to download segment: {} of table: {} with issue ", new Object[]{predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType(), e});
                    }
                }, this._executor));
            } else {
                this._numOfUnableToDownloadSegments++;
            }
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get(DOWNLOAD_SEGMENTS_TIMEOUT_MIN, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            LOGGER.error("Task interrupted", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            LOGGER.error("Task encountered an exception", e2.getCause());
        } catch (TimeoutException e3) {
            LOGGER.error("Task timed out", e3);
        }
        LOGGER.info("Download segments from deep store took {} sec", Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
        return this._failedSegments.isEmpty() ? PredownloadCompletionReason.ALL_SEGMENTS_DOWNLOADED : PredownloadCompletionReason.SOME_SEGMENTS_DOWNLOAD_FAILED;
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x009b A[Catch: Exception -> 0x00c7, TryCatch #1 {Exception -> 0x00c7, blocks: (B:2:0x0000, B:4:0x0017, B:7:0x001e, B:8:0x003a, B:10:0x0076, B:12:0x009b, B:13:0x00a2, B:19:0x0045, B:20:0x004b, B:22:0x004f, B:23:0x0064, B:27:0x006f, B:28:0x0075), top: B:1:0x0000, inners: #0, #2 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void downloadSegment(org.apache.pinot.server.predownload.PredownloadSegmentInfo r11) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 231
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.server.predownload.PredownloadScheduler.downloadSegment(org.apache.pinot.server.predownload.PredownloadSegmentInfo):void");
    }

    private File getTmpSegmentDataDir(PredownloadSegmentInfo predownloadSegmentInfo) throws Exception {
        PredownloadTableInfo predownloadTableInfo = this._tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType());
        if (predownloadTableInfo == null) {
            throw new PredownloadException("Table info not found for segment: " + predownloadSegmentInfo.getSegmentName());
        }
        File file = new File(new File(predownloadTableInfo.getInstanceDataManagerConfig().getInstanceDataDir() + File.separator + predownloadTableInfo.getTableConfig().getTableName(), TMP_DIR_NAME), String.format(TMP_DIR_FORMAT, predownloadSegmentInfo.getSegmentName(), UUID.randomUUID()));
        if (file.exists()) {
            FileUtils.deleteQuietly(file);
        }
        FileUtils.forceMkdir(file);
        return file;
    }

    private File downloadAndStreamUntarWithRateLimit(PredownloadSegmentInfo predownloadSegmentInfo, File file, long j) throws Exception {
        String segmentName = predownloadSegmentInfo.getSegmentName();
        String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
        LOGGER.info("Trying to download segment {} using streamed download-untar with maxStreamRateInByte {}", segmentName, Long.valueOf(j));
        String downloadUrl = predownloadSegmentInfo.getDownloadUrl();
        try {
            File fetchAndStreamUntarToLocal = SegmentFetcherFactory.fetchAndStreamUntarToLocal(downloadUrl, file, j, new AtomicInteger(0));
            LOGGER.info("Download and untarred segment: {} for table: {} from: {}", new Object[]{segmentName, tableNameWithType, downloadUrl});
            return fetchAndStreamUntarToLocal;
        } catch (AttemptsExceededException e) {
            LOGGER.error("Attempts exceeded when stream download-untarring segment: {} for table: {} from: {} to: {}", new Object[]{segmentName, tableNameWithType, downloadUrl, file});
            throw e;
        }
    }

    File downloadAndDecrypt(PredownloadSegmentInfo predownloadSegmentInfo, File file) throws Exception {
        String segmentName = predownloadSegmentInfo.getSegmentName();
        String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
        File file2 = new File(file, segmentName + ".tar.gz");
        String downloadUrl = predownloadSegmentInfo.getDownloadUrl();
        try {
            LOGGER.info("Trying to download segment {}", segmentName);
            SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, file2, predownloadSegmentInfo.getCrypterName());
            LOGGER.info("Downloaded tarred segment: {} for table: {} from: {} to: {}, file length: {}", new Object[]{segmentName, tableNameWithType, downloadUrl, file2, Long.valueOf(file2.length())});
            return file2;
        } catch (AttemptsExceededException e) {
            LOGGER.error("Attempts exceeded when downloading segment: {} for table: {} from: {} to: {}", new Object[]{segmentName, tableNameWithType, downloadUrl, file2});
            throw e;
        }
    }

    private File moveSegment(PredownloadSegmentInfo predownloadSegmentInfo, File file) throws IOException {
        try {
            File segmentDataDir = predownloadSegmentInfo.getSegmentDataDir(this._tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType()));
            FileUtils.deleteDirectory(segmentDataDir);
            FileUtils.moveDirectory(file, segmentDataDir);
            return segmentDataDir;
        } catch (Exception e) {
            LOGGER.error("Failed to move segment: {} of table: {}", predownloadSegmentInfo.getSegmentName(), predownloadSegmentInfo.getTableNameWithType());
            throw e;
        }
    }

    File untarAndMoveSegment(PredownloadSegmentInfo predownloadSegmentInfo, File file, File file2) throws IOException {
        String segmentName = predownloadSegmentInfo.getSegmentName();
        String tableNameWithType = predownloadSegmentInfo.getTableNameWithType();
        File file3 = new File(file2, segmentName);
        try {
            File file4 = (File) TarCompressionUtils.untar(file, file3).get(0);
            LOGGER.info("Uncompressed tar file: {} into target dir: {}", file, file3);
            File segmentDataDir = predownloadSegmentInfo.getSegmentDataDir(this._tableInfoMap.get(predownloadSegmentInfo.getTableNameWithType()));
            FileUtils.deleteDirectory(segmentDataDir);
            FileUtils.moveDirectory(file4, segmentDataDir);
            LOGGER.info("Successfully downloaded segment: {} of table: {} to index dir: {}", new Object[]{segmentName, tableNameWithType, segmentDataDir});
            return segmentDataDir;
        } catch (Exception e) {
            LOGGER.error("Failed to untar segment: {} of table: {} from: {} to: {}", new Object[]{segmentName, tableNameWithType, file, file3});
            throw e;
        }
    }
}
