package org.apache.pinot.controller.helix.core.rebalance.tenant;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceObserver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.class */
public class DefaultTenantRebalancer implements TenantRebalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
    PinotHelixResourceManager _pinotHelixResourceManager;
    ExecutorService _executorService;

    public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._executorService = executorService;
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer
    public TenantRebalanceResult rebalance(TenantRebalanceContext tenantRebalanceContext) {
        HashMap hashMap = new HashMap();
        Set<String> tenantTables = getTenantTables(tenantRebalanceContext.getTenantName());
        tenantTables.forEach(str -> {
            try {
                Configuration extractRebalanceConfig = extractRebalanceConfig(tenantRebalanceContext);
                extractRebalanceConfig.setProperty("dryRun", true);
                hashMap.put(str, this._pinotHelixResourceManager.rebalanceTable(str, extractRebalanceConfig, false));
            } catch (TableNotFoundException e) {
                hashMap.put(str, new RebalanceResult(null, RebalanceResult.Status.FAILED, e.getMessage(), null, null, null));
            }
        });
        if (tenantRebalanceContext.isDryRun().booleanValue() || tenantRebalanceContext.isDowntime().booleanValue()) {
            return new TenantRebalanceResult(null, hashMap, tenantRebalanceContext.isVerboseResult());
        }
        for (String str2 : hashMap.keySet()) {
            RebalanceResult rebalanceResult = (RebalanceResult) hashMap.get(str2);
            if (rebalanceResult.getStatus() == RebalanceResult.Status.DONE) {
                hashMap.put(str2, new RebalanceResult(rebalanceResult.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller task status for the", rebalanceResult.getInstanceAssignment(), rebalanceResult.getTierInstanceAssignment(), rebalanceResult.getSegmentAssignment()));
            }
        }
        String createUniqueRebalanceJobIdentifier = createUniqueRebalanceJobIdentifier();
        ZkBasedTenantRebalanceObserver zkBasedTenantRebalanceObserver = new ZkBasedTenantRebalanceObserver(createUniqueRebalanceJobIdentifier, tenantRebalanceContext.getTenantName(), tenantTables, this._pinotHelixResourceManager);
        zkBasedTenantRebalanceObserver.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
        LinkedList linkedList = new LinkedList();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        int max = Math.max(tenantRebalanceContext.getDegreeOfParallelism(), 1);
        Set<String> dimensionalTables = getDimensionalTables(tenantRebalanceContext.getTenantName());
        AtomicInteger atomicInteger = new AtomicInteger(max);
        try {
            if (max > 1) {
                HashSet hashSet = !tenantRebalanceContext.getParallelWhitelist().isEmpty() ? new HashSet(tenantRebalanceContext.getParallelWhitelist()) : new HashSet(tenantTables);
                if (!tenantRebalanceContext.getParallelBlacklist().isEmpty()) {
                    hashSet = Sets.difference(hashSet, tenantRebalanceContext.getParallelBlacklist());
                }
                hashSet.forEach(str3 -> {
                    if (dimensionalTables.contains(str3)) {
                        concurrentLinkedDeque.addFirst(str3);
                    } else {
                        concurrentLinkedDeque.addLast(str3);
                    }
                });
                Sets.difference(tenantTables, hashSet).forEach(str4 -> {
                    if (dimensionalTables.contains(str4)) {
                        linkedList.addFirst(str4);
                    } else {
                        linkedList.addLast(str4);
                    }
                });
            } else {
                tenantTables.forEach(str5 -> {
                    if (dimensionalTables.contains(str5)) {
                        linkedList.addFirst(str5);
                    } else {
                        linkedList.addLast(str5);
                    }
                });
            }
            for (int i = 0; i < max; i++) {
                this._executorService.submit(() -> {
                    while (true) {
                        String str6 = (String) concurrentLinkedDeque.pollFirst();
                        if (str6 == null) {
                            break;
                        }
                        Configuration extractRebalanceConfig = extractRebalanceConfig(tenantRebalanceContext);
                        extractRebalanceConfig.setProperty("dryRun", false);
                        extractRebalanceConfig.setProperty("jobId", ((RebalanceResult) hashMap.get(str6)).getJobId());
                        rebalanceTable(str6, extractRebalanceConfig, zkBasedTenantRebalanceObserver);
                    }
                    if (atomicInteger.decrementAndGet() != 0) {
                        return;
                    }
                    Configuration extractRebalanceConfig2 = extractRebalanceConfig(tenantRebalanceContext);
                    extractRebalanceConfig2.setProperty("dryRun", false);
                    while (true) {
                        String str7 = (String) linkedList.pollFirst();
                        if (str7 == null) {
                            zkBasedTenantRebalanceObserver.onSuccess(String.format("Successfully rebalanced tenant %s.", tenantRebalanceContext.getTenantName()));
                            return;
                        } else {
                            extractRebalanceConfig2.setProperty("jobId", ((RebalanceResult) hashMap.get(str7)).getJobId());
                            rebalanceTable(str7, extractRebalanceConfig2, zkBasedTenantRebalanceObserver);
                        }
                    }
                });
            }
        } catch (Exception e) {
            zkBasedTenantRebalanceObserver.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", tenantRebalanceContext.getTenantName(), e.getMessage()));
        }
        return new TenantRebalanceResult(createUniqueRebalanceJobIdentifier, hashMap, tenantRebalanceContext.isVerboseResult());
    }

    private Set<String> getDimensionalTables(String str) {
        HashSet hashSet = new HashSet();
        for (String str2 : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str2);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", str2);
            } else if (str.equals(tableConfig.getTenantConfig().getServer()) && tableConfig.isDimTable()) {
                hashSet.add(str2);
            }
        }
        return hashSet;
    }

    private Configuration extractRebalanceConfig(TenantRebalanceContext tenantRebalanceContext) {
        BaseConfiguration baseConfiguration = new BaseConfiguration();
        baseConfiguration.addProperty("dryRun", tenantRebalanceContext.isDryRun());
        baseConfiguration.addProperty("reassignInstances", tenantRebalanceContext.isReassignInstances());
        baseConfiguration.addProperty("includeConsuming", tenantRebalanceContext.isIncludeConsuming());
        baseConfiguration.addProperty("bootstrap", tenantRebalanceContext.isBootstrap());
        baseConfiguration.addProperty("downtime", tenantRebalanceContext.isDowntime());
        baseConfiguration.addProperty("minReplicasToKeepUpForNoDowntime", tenantRebalanceContext.getMinAvailableReplicas());
        baseConfiguration.addProperty("bestEfforts", tenantRebalanceContext.isBestEfforts());
        baseConfiguration.addProperty("externalViewCheckIntervalInMs", tenantRebalanceContext.getExternalViewCheckIntervalInMs());
        baseConfiguration.addProperty("externalViewStabilizationTimeoutInMs", tenantRebalanceContext.getExternalViewStabilizationTimeoutInMs());
        baseConfiguration.addProperty("updateTargetTier", tenantRebalanceContext.isUpdateTargetTier());
        baseConfiguration.addProperty("jobId", createUniqueRebalanceJobIdentifier());
        return baseConfiguration;
    }

    private String createUniqueRebalanceJobIdentifier() {
        return UUID.randomUUID().toString();
    }

    private Set<String> getTenantTables(String str) {
        HashSet hashSet = new HashSet();
        for (String str2 : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str2);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", str2);
            } else if (str.equals(tableConfig.getTenantConfig().getServer())) {
                hashSet.add(str2);
            }
        }
        return hashSet;
    }

    private void rebalanceTable(String str, Configuration configuration, TenantRebalanceObserver tenantRebalanceObserver) {
        try {
            tenantRebalanceObserver.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, str, configuration.getString("jobId"));
            RebalanceResult rebalanceTable = this._pinotHelixResourceManager.rebalanceTable(str, configuration, true);
            if (rebalanceTable.getStatus().equals(RebalanceResult.Status.DONE)) {
                tenantRebalanceObserver.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, str, null);
            } else {
                tenantRebalanceObserver.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, str, rebalanceTable.getDescription());
            }
        } catch (Throwable th) {
            tenantRebalanceObserver.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, str, String.format("Caught exception/error while rebalancing table: %s", str));
        }
    }
}
