package org.apache.pinot.controller.helix.core.rebalance;

import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.class */
public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ZkBasedTableRebalanceObserver.class);
    private final String _tableNameWithType;
    private final String _rebalanceJobId;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private TableRebalanceProgressStats _tableRebalanceProgressStats;
    private int _numUpdatesToZk;

    public ZkBasedTableRebalanceObserver(String str, String str2, PinotHelixResourceManager pinotHelixResourceManager) {
        Preconditions.checkState(str != null, "Table name cannot be null");
        Preconditions.checkState(str2 != null, "rebalanceId cannot be null");
        Preconditions.checkState(pinotHelixResourceManager != null, "PinotHelixManager cannot be null");
        this._tableNameWithType = str;
        this._rebalanceJobId = str2;
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._tableRebalanceProgressStats = new TableRebalanceProgressStats();
        this._numUpdatesToZk = 0;
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onTrigger(TableRebalanceObserver.Trigger trigger, Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        switch (trigger) {
            case START_TRIGGER:
                updateOnStart(map, map2);
                trackStatsInZk();
                return;
            case IDEAL_STATE_CHANGE_TRIGGER:
                TableRebalanceProgressStats.RebalanceStateStats differenceBetweenTableRebalanceStates = getDifferenceBetweenTableRebalanceStates(map2, map);
                if (TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getCurrentToTargetConvergence(), differenceBetweenTableRebalanceStates)) {
                    this._tableRebalanceProgressStats.setCurrentToTargetConvergence(differenceBetweenTableRebalanceStates);
                    trackStatsInZk();
                    return;
                }
                return;
            case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
                TableRebalanceProgressStats.RebalanceStateStats differenceBetweenTableRebalanceStates2 = getDifferenceBetweenTableRebalanceStates(map2, map);
                if (TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), differenceBetweenTableRebalanceStates2)) {
                    this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(differenceBetweenTableRebalanceStates2);
                    trackStatsInZk();
                    return;
                }
                return;
            default:
                throw new IllegalArgumentException("Unimplemented trigger: " + trigger);
        }
    }

    private void updateOnStart(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        Preconditions.checkState(this._tableRebalanceProgressStats.getStatus() != RebalanceResult.Status.IN_PROGRESS.toString(), "Rebalance Observer onStart called multiple times");
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
        this._tableRebalanceProgressStats.setInitialToTargetStateConvergence(getDifferenceBetweenTableRebalanceStates(map2, map));
        this._tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onSuccess(String str) {
        Preconditions.checkState(this._tableRebalanceProgressStats.getStatus() != RebalanceResult.Status.DONE.toString(), "Table Rebalance already completed");
        long currentTimeMillis = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000;
        this._tableRebalanceProgressStats.setCompletionStatusMsg(str);
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(Long.valueOf(currentTimeMillis));
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE.toString());
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStateStats = new TableRebalanceProgressStats.RebalanceStateStats();
        this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(rebalanceStateStats);
        this._tableRebalanceProgressStats.setCurrentToTargetConvergence(rebalanceStateStats);
        trackStatsInZk();
    }

    @Override // org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver
    public void onError(String str) {
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(Long.valueOf((System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000));
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED.toString());
        this._tableRebalanceProgressStats.setCompletionStatusMsg(str);
        trackStatsInZk();
    }

    public int getNumUpdatesToZk() {
        return this._numUpdatesToZk;
    }

    private void trackStatsInZk() {
        HashMap hashMap = new HashMap();
        hashMap.put("tableName", this._tableNameWithType);
        hashMap.put("jobId", this._rebalanceJobId);
        hashMap.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis()));
        hashMap.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.TABLE_REBALANCE.name());
        try {
            hashMap.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS, JsonUtils.objectToString(this._tableRebalanceProgressStats));
        } catch (JsonProcessingException e) {
            LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", this._rebalanceJobId, e);
        }
        this._pinotHelixResourceManager.addControllerJobToZK(this._rebalanceJobId, hashMap, ZKMetadataProvider.constructPropertyStorePathForControllerJob(ControllerJobType.TABLE_REBALANCE));
        this._numUpdatesToZk++;
        LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", Integer.valueOf(this._numUpdatesToZk), this._rebalanceJobId);
    }

    public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetweenTableRebalanceStates(Map<String, Map<String, String>> map, Map<String, Map<String, String>> map2) {
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStateStats = new TableRebalanceProgressStats.RebalanceStateStats();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            Map<String, String> map3 = map2.get(entry.getKey());
            if (map3 == null) {
                rebalanceStateStats._segmentsMissing++;
                rebalanceStateStats._segmentsToRebalance++;
            } else {
                boolean z = true;
                for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
                    String value = entry2.getValue();
                    if (!value.equals("OFFLINE") && !value.equals(map3.get(entry2.getKey()))) {
                        rebalanceStateStats._replicasToRebalance++;
                        z = false;
                    }
                }
                if (!z) {
                    rebalanceStateStats._segmentsToRebalance++;
                }
            }
        }
        int size = map.size();
        rebalanceStateStats._percentSegmentsToRebalance = size == 0 ? 0.0d : (rebalanceStateStats._segmentsToRebalance / size) * 100.0d;
        return rebalanceStateStats;
    }
}
