package org.apache.pinot.controller.helix.core.rebalance;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.class */
public class RebalanceChecker extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RebalanceChecker.class);
    private static final double RETRY_DELAY_SCALE_FACTOR = 2.0d;
    private final ExecutorService _executorService;

    public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, ExecutorService executorService) {
        super(RebalanceChecker.class.getSimpleName(), controllerConf.getRebalanceCheckerFrequencyInSeconds(), controllerConf.getRebalanceCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._executorService = executorService;
    }

    @Override // org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask
    protected void processTables(List<String> list, Properties properties) {
        int size = list.size();
        LOGGER.info("Processing {} tables in task: {}", Integer.valueOf(size), this._taskName);
        int retryRebalanceTables = retryRebalanceTables(new HashSet(list));
        this._controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, this._taskName, retryRebalanceTables);
        LOGGER.info("Finish processing {}/{} tables in task: {}", Integer.valueOf(retryRebalanceTables), Integer.valueOf(size), this._taskName);
    }

    private synchronized int retryRebalanceTables(Set<String> set) {
        Map<String, Map<String, String>> allJobs = this._pinotHelixResourceManager.getAllJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE), map -> {
            return set.contains(map.get("tableName"));
        });
        HashMap hashMap = new HashMap();
        allJobs.forEach((str, map2) -> {
            ((Map) hashMap.computeIfAbsent((String) map2.get("tableName"), str -> {
                return new HashMap();
            })).put(str, map2);
        });
        int i = 0;
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            Map<String, Map<String, String>> map3 = (Map) entry.getValue();
            try {
                LOGGER.info("Start to retry rebalance for table: {} with {} rebalance jobs tracked", str2, Integer.valueOf(map3.size()));
                retryRebalanceTable(str2, map3);
                i++;
            } catch (Exception e) {
                LOGGER.error("Failed to retry rebalance for table: {}", str2, e);
                this._controllerMetrics.addMeteredTableValue(str2 + "." + this._taskName, ControllerMeter.PERIODIC_TASK_ERROR, 1L);
            }
        }
        return i;
    }

    @VisibleForTesting
    void retryRebalanceTable(String str, Map<String, Map<String, String>> map) throws Exception {
        Map<String, Set<Pair<TableRebalanceContext, Long>>> candidateJobs = getCandidateJobs(str, map);
        if (candidateJobs.isEmpty()) {
            LOGGER.info("Found no failed rebalance jobs for table: {}. Skip retry", str);
            return;
        }
        this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.TABLE_REBALANCE_FAILURE_DETECTED, 1L);
        Pair<TableRebalanceContext, Long> latestJob = getLatestJob(candidateJobs);
        if (latestJob == null) {
            LOGGER.info("Rebalance has been retried too many times for table: {}. Skip retry", str);
            this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.TABLE_REBALANCE_RETRY_TOO_MANY_TIMES, 1L);
            return;
        }
        TableRebalanceContext left = latestJob.getLeft();
        String jobId = left.getJobId();
        RebalanceConfig config = left.getConfig();
        long longValue = latestJob.getRight().longValue();
        long retryDelayInMs = getRetryDelayInMs(config.getRetryInitialDelayInMs(), left.getAttemptId());
        if (longValue + retryDelayInMs > System.currentTimeMillis()) {
            LOGGER.info("Delay retry for failed rebalance job: {} that started at: {}, by: {}ms", jobId, Long.valueOf(longValue), Long.valueOf(retryDelayInMs));
            return;
        }
        abortExistingJobs(str, this._pinotHelixResourceManager);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", str);
        this._executorService.submit(() -> {
            try {
                retryRebalanceTableWithContext(str, tableConfig, left);
            } catch (Throwable th) {
                LOGGER.error("Failed to retry rebalance for table: {} asynchronously", str, th);
            }
        });
    }

    private void retryRebalanceTableWithContext(String str, TableConfig tableConfig, TableRebalanceContext tableRebalanceContext) {
        String jobId = tableRebalanceContext.getJobId();
        RebalanceConfig config = tableRebalanceContext.getConfig();
        TableRebalanceContext forRetry = TableRebalanceContext.forRetry(tableRebalanceContext.getOriginalJobId(), config, tableRebalanceContext.getAttemptId() + 1);
        String jobId2 = forRetry.getJobId();
        LOGGER.info("Retry rebalance job: {} for table: {} with attempt job: {}", jobId, str, jobId2);
        this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
        LOGGER.info("New attempt: {} for table: {} is done with result status: {}", jobId2, str, this._pinotHelixResourceManager.rebalanceTable(str, tableConfig, jobId2, config, new ZkBasedTableRebalanceObserver(str, jobId2, forRetry, this._pinotHelixResourceManager)).getStatus());
    }

    @VisibleForTesting
    static long getRetryDelayInMs(long j, int i) {
        double pow = j * Math.pow(2.0d, i - 1);
        return RandomUtils.nextLong((long) pow, (long) (pow * 2.0d));
    }

    private static void abortExistingJobs(String str, PinotHelixResourceManager pinotHelixResourceManager) {
        LOGGER.info("Tried to abort existing jobs at best effort and done: {}", Boolean.valueOf(pinotHelixResourceManager.updateJobsForTable(str, ControllerJobType.TABLE_REBALANCE, map -> {
            String str2 = (String) map.get(CommonConstants.ControllerJob.JOB_ID);
            try {
                TableRebalanceProgressStats tableRebalanceProgressStats = (TableRebalanceProgressStats) JsonUtils.stringToObject((String) map.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), TableRebalanceProgressStats.class);
                if (tableRebalanceProgressStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
                    return;
                }
                LOGGER.info("Abort rebalance job: {} for table: {}", str2, str);
                tableRebalanceProgressStats.setStatus(RebalanceResult.Status.ABORTED);
                map.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(tableRebalanceProgressStats));
            } catch (Exception e) {
                LOGGER.error("Failed to abort rebalance job: {} for table: {}", str2, str, e);
            }
        })));
    }

    @VisibleForTesting
    static Pair<TableRebalanceContext, Long> getLatestJob(Map<String, Set<Pair<TableRebalanceContext, Long>>> map) {
        Pair<TableRebalanceContext, Long> pair = null;
        for (Map.Entry<String, Set<Pair<TableRebalanceContext, Long>>> entry : map.entrySet()) {
            Set<Pair<TableRebalanceContext, Long>> value = entry.getValue();
            int maxAttempts = value.iterator().next().getLeft().getConfig().getMaxAttempts();
            Pair<TableRebalanceContext, Long> pair2 = null;
            Iterator<Pair<TableRebalanceContext, Long>> it2 = value.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                Pair<TableRebalanceContext, Long> next = it2.next();
                if (next.getLeft().getAttemptId() >= maxAttempts) {
                    pair2 = null;
                    break;
                }
                if (pair2 == null || pair2.getRight().longValue() < next.getRight().longValue()) {
                    pair2 = next;
                }
            }
            if (pair2 == null) {
                LOGGER.info("Rebalance job: {} had exceeded maxAttempts: {}. Skip retry", entry.getKey(), Integer.valueOf(maxAttempts));
            } else if (pair == null || pair.getRight().longValue() < pair2.getRight().longValue()) {
                pair = pair2;
            }
        }
        return pair;
    }

    @VisibleForTesting
    static Map<String, Set<Pair<TableRebalanceContext, Long>>> getCandidateJobs(String str, Map<String, Map<String, String>> map) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        Pair pair = null;
        Pair pair2 = null;
        HashMap hashMap2 = new HashMap();
        HashSet<String> hashSet = new HashSet();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            String key = entry.getKey();
            Map<String, String> value = entry.getValue();
            long parseLong = Long.parseLong(value.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
            String str2 = value.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
            if (StringUtils.isEmpty(str2)) {
                LOGGER.info("Skip rebalance job: {} as it has no job progress stats", key);
            } else {
                String str3 = value.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
                if (StringUtils.isEmpty(str3)) {
                    LOGGER.info("Skip rebalance job: {} as it has no job context", key);
                } else {
                    TableRebalanceProgressStats tableRebalanceProgressStats = (TableRebalanceProgressStats) JsonUtils.stringToObject(str2, TableRebalanceProgressStats.class);
                    TableRebalanceContext tableRebalanceContext = (TableRebalanceContext) JsonUtils.stringToObject(str3, TableRebalanceContext.class);
                    long startTimeMs = tableRebalanceProgressStats.getStartTimeMs();
                    if (pair == null || ((Long) pair.getRight()).longValue() < startTimeMs) {
                        pair = Pair.of(key, Long.valueOf(startTimeMs));
                    }
                    String originalJobId = tableRebalanceContext.getOriginalJobId();
                    RebalanceResult.Status status = tableRebalanceProgressStats.getStatus();
                    if (status == RebalanceResult.Status.DONE || status == RebalanceResult.Status.NO_OP) {
                        LOGGER.info("Skip rebalance job: {} as it has completed with status: {}", key, status);
                        hashMap2.put(originalJobId, key);
                        if (pair2 == null || ((Long) pair2.getRight()).longValue() < startTimeMs) {
                            pair2 = Pair.of(key, Long.valueOf(startTimeMs));
                        }
                    } else if (status == RebalanceResult.Status.FAILED || status == RebalanceResult.Status.ABORTED) {
                        LOGGER.info("Found rebalance job: {} for original job: {} has been stopped with status: {}", key, originalJobId, status);
                        ((Set) hashMap.computeIfAbsent(originalJobId, str4 -> {
                            return new HashSet();
                        })).add(Pair.of(tableRebalanceContext, Long.valueOf(startTimeMs)));
                    } else if (status == RebalanceResult.Status.CANCELLED) {
                        LOGGER.info("Found cancelled rebalance job: {} for original job: {}", key, originalJobId);
                        hashSet.add(originalJobId);
                    } else {
                        long heartbeatTimeoutInMs = tableRebalanceContext.getConfig().getHeartbeatTimeoutInMs();
                        if (currentTimeMillis - parseLong < heartbeatTimeoutInMs) {
                            LOGGER.info("Rebalance job: {} is actively running with status updated at: {} within timeout: {}. Skip retry for table: {}", key, Long.valueOf(parseLong), Long.valueOf(heartbeatTimeoutInMs), str);
                            return Collections.emptyMap();
                        }
                        LOGGER.info("Found stuck rebalance job: {} for original job: {}", key, originalJobId);
                        ((Set) hashMap.computeIfAbsent(originalJobId, str5 -> {
                            return new HashSet();
                        })).add(Pair.of(tableRebalanceContext, Long.valueOf(startTimeMs)));
                    }
                }
            }
        }
        if (pair2 != null && ((String) pair2.getLeft()).equals(pair.getLeft())) {
            LOGGER.info("Rebalance job: {} started most recently has already done. Skip retry for table: {}", pair2.getLeft(), str);
            return Collections.emptyMap();
        }
        for (String str6 : hashSet) {
            LOGGER.info("Skip original job: {} as it's cancelled", str6);
            hashMap.remove(str6);
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            LOGGER.info("Skip original job: {} as it's completed by attempt: {}", entry2.getKey(), entry2.getValue());
            hashMap.remove(entry2.getKey());
        }
        return hashMap;
    }
}
