package org.apache.pinot.broker.broker.helix;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixConstants;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@BatchMode(enabled = false)
@PreFetch(enabled = false)
/* loaded from: input_file:org/apache/pinot/broker/broker/helix/ClusterChangeMediator.class */
public class ClusterChangeMediator implements IdealStateChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener, LiveInstanceChangeListener {
    private static final Logger LOGGER;
    private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600000;
    private final Map<HelixConstants.ChangeType, List<ClusterChangeHandler>> _changeHandlersMap;
    private final Map<HelixConstants.ChangeType, Long> _lastChangeTimeMap = new ConcurrentHashMap();
    private final Map<HelixConstants.ChangeType, Long> _lastProcessTimeMap = new HashMap();
    private final Thread _clusterChangeHandlingThread;
    private volatile boolean _running;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ClusterChangeMediator(Map<HelixConstants.ChangeType, List<ClusterChangeHandler>> map, BrokerMetrics brokerMetrics) {
        this._changeHandlersMap = map;
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<HelixConstants.ChangeType> it2 = map.keySet().iterator();
        while (it2.hasNext()) {
            this._lastProcessTimeMap.put(it2.next(), Long.valueOf(currentTimeMillis));
        }
        this._clusterChangeHandlingThread = new Thread(() -> {
            while (this._running) {
                try {
                    for (Map.Entry<HelixConstants.ChangeType, List<ClusterChangeHandler>> entry : this._changeHandlersMap.entrySet()) {
                        if (!this._running) {
                            return;
                        }
                        HelixConstants.ChangeType key = entry.getKey();
                        List<ClusterChangeHandler> value = entry.getValue();
                        long currentTimeMillis2 = System.currentTimeMillis();
                        Long remove = this._lastChangeTimeMap.remove(key);
                        if (remove != null) {
                            brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTimeMillis2 - remove.longValue(), TimeUnit.MILLISECONDS);
                            processClusterChange(key, value);
                        } else if (currentTimeMillis2 - this._lastProcessTimeMap.get(key).longValue() > 3600000) {
                            LOGGER.info("Proactive check {} change", key);
                            brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 1L);
                            processClusterChange(key, value);
                        }
                    }
                    synchronized (this._lastChangeTimeMap) {
                        if (!this._running) {
                            return;
                        }
                        if (this._lastChangeTimeMap.isEmpty()) {
                            this._lastChangeTimeMap.wait(360000L);
                        }
                    }
                } catch (Exception e) {
                    if (this._running) {
                        LOGGER.error("Caught exception within cluster change handling thread", (Throwable) e);
                    }
                }
            }
        }, "ClusterChangeHandlingThread");
        this._clusterChangeHandlingThread.setDaemon(true);
    }

    private synchronized void processClusterChange(HelixConstants.ChangeType changeType, List<ClusterChangeHandler> list) {
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Start processing {} change", changeType);
        for (ClusterChangeHandler clusterChangeHandler : list) {
            try {
                long currentTimeMillis2 = System.currentTimeMillis();
                clusterChangeHandler.processClusterChange(changeType);
                LOGGER.info("Finish handling {} change for handler: {} in {}ms", changeType, clusterChangeHandler.getClass().getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            } catch (Exception e) {
                LOGGER.error("Caught exception while handling {} change for handler: {}", changeType, clusterChangeHandler.getClass().getName(), e);
            }
        }
        long currentTimeMillis3 = System.currentTimeMillis();
        LOGGER.info("Finish processing {} change in {}ms", changeType, Long.valueOf(currentTimeMillis3 - currentTimeMillis));
        this._lastProcessTimeMap.put(changeType, Long.valueOf(currentTimeMillis3));
    }

    public void start() {
        LOGGER.info("Starting ClusterChangeMediator");
        this._running = true;
        this._clusterChangeHandlingThread.start();
    }

    public void stop() {
        LOGGER.info("Stopping ClusterChangeMediator");
        this._running = false;
        try {
            this._clusterChangeHandlingThread.interrupt();
            this._clusterChangeHandlingThread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for cluster change handling thread to finish", e);
        }
    }

    @Override // org.apache.helix.api.listeners.IdealStateChangeListener
    public void onIdealStateChange(List<IdealState> list, NotificationContext notificationContext) throws InterruptedException {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.IDEAL_STATE);
    }

    @Override // org.apache.helix.api.listeners.ExternalViewChangeListener
    public void onExternalViewChange(List<ExternalView> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
    }

    @Override // org.apache.helix.api.listeners.InstanceConfigChangeListener
    public void onInstanceConfigChange(List<InstanceConfig> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.INSTANCE_CONFIG);
    }

    @Override // org.apache.helix.api.listeners.LiveInstanceChangeListener
    public void onLiveInstanceChange(List<LiveInstance> list, NotificationContext notificationContext) {
        if (!$assertionsDisabled && !list.isEmpty()) {
            throw new AssertionError();
        }
        enqueueChange(HelixConstants.ChangeType.LIVE_INSTANCE);
    }

    private void enqueueChange(HelixConstants.ChangeType changeType) {
        if (!this._running) {
            LOGGER.warn("ClusterChangeMediator already stopped, skipping enqueuing the {} change", changeType);
            return;
        }
        if (!this._clusterChangeHandlingThread.isAlive()) {
            LOGGER.warn("Cluster change handling thread is not alive, directly process the {} change", changeType);
            processClusterChange(changeType, this._changeHandlersMap.get(changeType));
            return;
        }
        LOGGER.info("Enqueueing {} change", changeType);
        if (this._lastChangeTimeMap.putIfAbsent(changeType, Long.valueOf(System.currentTimeMillis())) == null) {
            synchronized (this._lastChangeTimeMap) {
                this._lastChangeTimeMap.notify();
            }
        }
    }

    static {
        $assertionsDisabled = !ClusterChangeMediator.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger((Class<?>) ClusterChangeMediator.class);
    }
}
