package org.apache.pinot.controller.helix.core.rebalance;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.class */
public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
    private final String _tableNameWithType;
    private final String _rebalanceJobId;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private final TableRebalanceProgressStats _tableRebalanceProgressStats;
    private final TableRebalanceContext _tableRebalanceContext;
    private long _lastUpdateTimeMs;
    private int _numUpdatesToZk;
    private boolean _isStopped = false;
    private RebalanceResult.Status _stopStatus;
    private final ControllerMetrics _controllerMetrics;

    public ZkBasedTableRebalanceObserver(String str, String str2, TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager pinotHelixResourceManager) {
        Preconditions.checkState(str != null, "Table name cannot be null");
        Preconditions.checkState(str2 != null, "rebalanceId cannot be null");
        Preconditions.checkState(pinotHelixResourceManager != null, "PinotHelixManager cannot be null");
        this._tableNameWithType = str;
        this._rebalanceJobId = str2;
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._tableRebalanceProgressStats = new TableRebalanceProgressStats();
        this._tableRebalanceContext = tableRebalanceContext;
        this._numUpdatesToZk = 0;
        this._controllerMetrics = ControllerMetrics.get();
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onTrigger(TableRebalanceObserver.Trigger trigger, Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        boolean z = false;
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1L);
        switch (trigger) {
            case START_TRIGGER:
                updateOnStart(map, map2);
                trackStatsInZk();
                z = true;
                break;
            case IDEAL_STATE_CHANGE_TRIGGER:
                TableRebalanceProgressStats.RebalanceStateStats differenceBetweenTableRebalanceStates = getDifferenceBetweenTableRebalanceStates(map2, map);
                if (TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getCurrentToTargetConvergence(), differenceBetweenTableRebalanceStates)) {
                    this._tableRebalanceProgressStats.setCurrentToTargetConvergence(differenceBetweenTableRebalanceStates);
                    trackStatsInZk();
                    z = true;
                    break;
                }
                break;
            case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
                TableRebalanceProgressStats.RebalanceStateStats differenceBetweenTableRebalanceStates2 = getDifferenceBetweenTableRebalanceStates(map2, map);
                if (TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), differenceBetweenTableRebalanceStates2)) {
                    this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(differenceBetweenTableRebalanceStates2);
                    trackStatsInZk();
                    z = true;
                    break;
                }
                break;
            default:
                throw new IllegalArgumentException("Unimplemented trigger: " + trigger);
        }
        long heartbeatIntervalInMs = this._tableRebalanceContext.getConfig().getHeartbeatIntervalInMs();
        if (z || System.currentTimeMillis() - this._lastUpdateTimeMs <= heartbeatIntervalInMs) {
            return;
        }
        LOGGER.debug("Update status of rebalance job: {} for table: {} after {}ms as heartbeat", new Object[]{this._rebalanceJobId, this._tableNameWithType, Long.valueOf(heartbeatIntervalInMs)});
        trackStatsInZk();
    }

    private void updateOnStart(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        Preconditions.checkState(RebalanceResult.Status.IN_PROGRESS != this._tableRebalanceProgressStats.getStatus(), "Rebalance Observer onStart called multiple times");
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
        this._tableRebalanceProgressStats.setInitialToTargetStateConvergence(getDifferenceBetweenTableRebalanceStates(map2, map));
        this._tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onSuccess(String str) {
        Preconditions.checkState(RebalanceResult.Status.DONE != this._tableRebalanceProgressStats.getStatus(), "Table Rebalance already completed");
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0L);
        long currentTimeMillis = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000;
        this._tableRebalanceProgressStats.setCompletionStatusMsg(str);
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(Long.valueOf(currentTimeMillis));
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE);
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStateStats = new TableRebalanceProgressStats.RebalanceStateStats();
        this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(rebalanceStateStats);
        this._tableRebalanceProgressStats.setCurrentToTargetConvergence(rebalanceStateStats);
        trackStatsInZk();
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onError(String str) {
        this._controllerMetrics.setValueOfTableGauge(this._tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0L);
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(Long.valueOf((System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000));
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
        this._tableRebalanceProgressStats.setCompletionStatusMsg(str);
        trackStatsInZk();
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public boolean isStopped() {
        return this._isStopped;
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public RebalanceResult.Status getStopStatus() {
        return this._stopStatus;
    }

    public int getNumUpdatesToZk() {
        return this._numUpdatesToZk;
    }

    @VisibleForTesting
    TableRebalanceContext getTableRebalanceContext() {
        return this._tableRebalanceContext;
    }

    private void trackStatsInZk() {
        this._pinotHelixResourceManager.addControllerJobToZK(this._rebalanceJobId, createJobMetadata(this._tableNameWithType, this._rebalanceJobId, this._tableRebalanceProgressStats, this._tableRebalanceContext), "TABLE_REBALANCE", map -> {
            if (map == null) {
                return true;
            }
            try {
                TableRebalanceProgressStats tableRebalanceProgressStats = (TableRebalanceProgressStats) JsonUtils.stringToObject((String) map.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS), TableRebalanceProgressStats.class);
                if (tableRebalanceProgressStats == null || RebalanceResult.Status.IN_PROGRESS == tableRebalanceProgressStats.getStatus()) {
                    return true;
                }
                this._isStopped = true;
                this._stopStatus = tableRebalanceProgressStats.getStatus();
                LOGGER.warn("Rebalance job: {} for table: {} has already stopped with status: {}", new Object[]{this._rebalanceJobId, this._tableNameWithType, this._stopStatus});
                return false;
            } catch (JsonProcessingException e) {
                return true;
            }
        });
        this._numUpdatesToZk++;
        this._lastUpdateTimeMs = System.currentTimeMillis();
        LOGGER.debug("Made {} ZK updates for rebalance job: {} of table: {}", new Object[]{Integer.valueOf(this._numUpdatesToZk), this._rebalanceJobId, this._tableNameWithType});
    }

    @VisibleForTesting
    static Map<String, String> createJobMetadata(String str, String str2, TableRebalanceProgressStats tableRebalanceProgressStats, TableRebalanceContext tableRebalanceContext) {
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.TABLE_NAME, str);
        hashMap.put("jobId", str2);
        hashMap.put("submissionTimeMs", Long.toString(System.currentTimeMillis()));
        hashMap.put("jobType", "TABLE_REBALANCE");
        try {
            hashMap.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(tableRebalanceProgressStats));
        } catch (JsonProcessingException e) {
            LOGGER.error("Error serialising stats for rebalance job: {} of table: {} to keep in ZK", new Object[]{str2, str, e});
        }
        try {
            hashMap.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT, JsonUtils.objectToString(tableRebalanceContext));
        } catch (JsonProcessingException e2) {
            LOGGER.error("Error serialising retry configs for rebalance job: {} of table: {} to keep in ZK", new Object[]{str2, str, e2});
        }
        return hashMap;
    }

    public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetweenTableRebalanceStates(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStateStats = new TableRebalanceProgressStats.RebalanceStateStats();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            Map<String, String> map3 = map2.get(entry.getKey());
            if (map3 == null) {
                rebalanceStateStats._segmentsMissing++;
                rebalanceStateStats._segmentsToRebalance++;
            } else {
                boolean z = true;
                for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
                    String value = entry2.getValue();
                    if (!value.equals("OFFLINE") && !value.equals(map3.get(entry2.getKey()))) {
                        rebalanceStateStats._replicasToRebalance++;
                        z = false;
                    }
                }
                if (!z) {
                    rebalanceStateStats._segmentsToRebalance++;
                }
            }
        }
        int size = map.size();
        rebalanceStateStats._percentSegmentsToRebalance = size == 0 ? 0.0d : (rebalanceStateStats._segmentsToRebalance / size) * 100.0d;
        return rebalanceStateStats;
    }
}
