package org.apache.pinot.controller.helix.core.relocation;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.class */
public class SegmentRelocator extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentRelocator.class);
    private final ExecutorService _executorService;
    private final HttpClientConnectionManager _connectionManager;
    private final boolean _enableLocalTierMigration;
    private final int _serverAdminRequestTimeoutMs;
    private final long _externalViewCheckIntervalInMs;
    private final long _externalViewStabilizationTimeoutInMs;
    private final Set<String> _waitingTables;
    private final BlockingQueue<String> _waitingQueue;

    public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, ExecutorService executorService, HttpClientConnectionManager httpClientConnectionManager) {
        super(SegmentRelocator.class.getSimpleName(), controllerConf.getSegmentRelocatorFrequencyInSeconds(), controllerConf.getSegmentRelocatorInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._executorService = executorService;
        this._connectionManager = httpClientConnectionManager;
        this._enableLocalTierMigration = controllerConf.enableSegmentRelocatorLocalTierMigration();
        this._serverAdminRequestTimeoutMs = controllerConf.getServerAdminRequestTimeoutSeconds() * 1000;
        long segmentRelocatorFrequencyInSeconds = controllerConf.getSegmentRelocatorFrequencyInSeconds() * 1000;
        this._externalViewCheckIntervalInMs = Math.min(segmentRelocatorFrequencyInSeconds, controllerConf.getSegmentRelocatorExternalViewCheckIntervalInMs());
        this._externalViewStabilizationTimeoutInMs = Math.min(segmentRelocatorFrequencyInSeconds, controllerConf.getSegmentRelocatorExternalViewStabilizationTimeoutInMs());
        if (!controllerConf.isSegmentRelocatorRebalanceTablesSequentially()) {
            this._waitingTables = null;
            this._waitingQueue = null;
        } else {
            this._waitingTables = ConcurrentHashMap.newKeySet();
            this._waitingQueue = new LinkedBlockingQueue();
            this._executorService.submit(() -> {
                LOGGER.info("Rebalance tables sequentially");
                while (true) {
                    try {
                        rebalanceWaitingTable(this::rebalanceTable);
                    } catch (InterruptedException e) {
                        LOGGER.warn("Got interrupted while rebalancing tables sequentially", (Throwable) e);
                        return;
                    }
                }
            });
        }
    }

    @Override // org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask
    protected void processTable(String str) {
        if (this._waitingTables != null) {
            putTableToWait(str);
        } else {
            LOGGER.debug("Rebalance table: {} immediately", str);
            this._executorService.submit(() -> {
                rebalanceTable(str);
            });
        }
    }

    @VisibleForTesting
    void putTableToWait(String str) {
        if (!this._waitingTables.add(str)) {
            LOGGER.debug("Table: {} is already in waiting queue", str);
        } else {
            this._waitingQueue.offer(str);
            LOGGER.debug("Table: {} is added in waiting queue, total waiting: {}", str, Integer.valueOf(this._waitingTables.size()));
        }
    }

    @VisibleForTesting
    void rebalanceWaitingTable(Consumer<String> consumer) throws InterruptedException {
        LOGGER.debug("Getting next waiting table to rebalance");
        String take = this._waitingQueue.take();
        try {
            consumer.accept(take);
        } finally {
            this._waitingTables.remove(take);
            LOGGER.debug("Rebalance done for table: {}, total waiting: {}", take, Integer.valueOf(this._waitingTables.size()));
        }
    }

    @VisibleForTesting
    BlockingQueue<String> getWaitingQueue() {
        return this._waitingQueue;
    }

    private void rebalanceTable(String str) {
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: {}", str);
        boolean isRealtimeTableResource = TableNameBuilder.isRealtimeTableResource(str);
        if (isRealtimeTableResource && new StreamConfig(str, IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType()) {
            return;
        }
        boolean z = false;
        if (TierConfigUtils.shouldRelocateToTiers(tableConfig)) {
            z = true;
            LOGGER.info("Relocating segments to tiers for table: {}", str);
        }
        if (isRealtimeTableResource && InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
            z = true;
            LOGGER.info("Relocating COMPLETED segments for table: {}", str);
        }
        if (!z) {
            LOGGER.debug("No need to relocate segments of table: {}", str);
            return;
        }
        BaseConfiguration baseConfiguration = new BaseConfiguration();
        baseConfiguration.addProperty(RebalanceConfigConstants.MIN_REPLICAS_TO_KEEP_UP_FOR_NO_DOWNTIME, -1);
        baseConfiguration.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_CHECK_INTERVAL_IN_MS, Long.valueOf(this._externalViewCheckIntervalInMs));
        baseConfiguration.addProperty(RebalanceConfigConstants.EXTERNAL_VIEW_STABILIZATION_TIMEOUT_IN_MS, Long.valueOf(this._externalViewStabilizationTimeoutInMs));
        baseConfiguration.addProperty(RebalanceConfigConstants.UPDATE_TARGET_TIER, Boolean.valueOf(TierConfigUtils.shouldRelocateToTiers(tableConfig)));
        baseConfiguration.addProperty("jobId", TableRebalancer.createUniqueRebalanceJobIdentifier());
        try {
            switch (this._pinotHelixResourceManager.rebalanceTable(str, baseConfiguration, false).getStatus()) {
                case NO_OP:
                    LOGGER.info("All segments are already relocated for table: {}", str);
                    migrateToTargetTier(str);
                    break;
                case DONE:
                    LOGGER.info("Finished relocating segments for table: {}", str);
                    migrateToTargetTier(str);
                    break;
                default:
                    LOGGER.error("Relocation failed for table: {}", str);
                    break;
            }
        } catch (Throwable th) {
            LOGGER.error("Caught exception/error while rebalancing table: {}", str, th);
        }
    }

    private void migrateToTargetTier(String str) {
        if (!this._enableLocalTierMigration) {
            LOGGER.debug("Skipping migrating segments of table: {} to new tiers on hosting servers", str);
            return;
        }
        LOGGER.info("Migrating segments of table: {} to new tiers on hosting servers", str);
        try {
            triggerLocalTierMigration(str, new TableTierReader(this._executorService, this._connectionManager, this._pinotHelixResourceManager).getTableTierDetails(str, null, this._serverAdminRequestTimeoutMs, true), this._pinotHelixResourceManager.getHelixZkManager().getMessagingService());
            LOGGER.info("Migrated segments of table: {} to new tiers on hosting servers", str);
        } catch (Exception e) {
            LOGGER.error("Failed to migrate segments of table: {} to new tiers on hosting servers", str, e);
        }
    }

    @VisibleForTesting
    static void triggerLocalTierMigration(String str, TableTierReader.TableTierDetails tableTierDetails, ClusterMessagingService clusterMessagingService) {
        Map<String, Map<String, String>> segmentCurrentTiers = tableTierDetails.getSegmentCurrentTiers();
        Map<String, String> segmentTargetTiers = tableTierDetails.getSegmentTargetTiers();
        LOGGER.debug("Got segment current tiers: {} and target tiers: {}", segmentCurrentTiers, segmentTargetTiers);
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, String>> entry : segmentCurrentTiers.entrySet()) {
            String key = entry.getKey();
            Map<String, String> value = entry.getValue();
            String str2 = segmentTargetTiers.get(key);
            for (Map.Entry<String, String> entry2 : value.entrySet()) {
                String value2 = entry2.getValue();
                String key2 = entry2.getKey();
                if (!(value2 == null && str2 == null) && (value2 == null || !value2.equals(str2))) {
                    LOGGER.debug("Segment: {} needs to move from current tier: {} to target tier: {} on server: {}", key, TierConfigUtils.normalizeTierName(value2), TierConfigUtils.normalizeTierName(str2), key2);
                    ((Set) hashMap.computeIfAbsent(key2, str3 -> {
                        return new HashSet();
                    })).add(key);
                } else {
                    LOGGER.debug("Segment: {} is already on the target tier: {} on server: {}", key, TierConfigUtils.normalizeTierName(value2), key2);
                }
            }
        }
        if (hashMap.size() <= 0) {
            LOGGER.info("No server needs to move segments to new tiers locally");
        } else {
            LOGGER.info("Notify servers: {} to move segments to new tiers locally", hashMap.keySet());
            reloadSegmentsForLocalTierMigration(str, hashMap, clusterMessagingService);
        }
    }

    private static void reloadSegmentsForLocalTierMigration(String str, Map<String, Set<String>> map, ClusterMessagingService clusterMessagingService) {
        for (Map.Entry<String, Set<String>> entry : map.entrySet()) {
            String key = entry.getKey();
            Set<String> value = entry.getValue();
            Criteria criteria = new Criteria();
            criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
            criteria.setInstanceName(key);
            criteria.setResource(str);
            criteria.setSessionSpecific(true);
            SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(str, new ArrayList(value), false);
            LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", key, value, str);
            if (clusterMessagingService.send(criteria, segmentReloadMessage, null, -1) > 0) {
                LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", key, str);
            } else {
                LOGGER.warn("No SegmentReloadMessage sent to server: {} for table: {}", key, str);
            }
        }
    }
}
