package org.apache.pinot.broker.failuredetector;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.broker.failuredetector.FailureDetector;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector.class */
public abstract class BaseExponentialBackoffRetryFailureDetector implements FailureDetector {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseExponentialBackoffRetryFailureDetector.class);
    protected final String _name = getClass().getSimpleName();
    protected final List<FailureDetector.Listener> _listeners = new ArrayList();
    protected final ConcurrentHashMap<String, RetryInfo> _unhealthyServerRetryInfoMap = new ConcurrentHashMap<>();
    protected final DelayQueue<RetryInfo> _retryInfoDelayQueue = new DelayQueue<>();
    protected BrokerMetrics _brokerMetrics;
    protected long _retryInitialDelayNs;
    protected double _retryDelayFactor;
    protected int _maxRetries;
    protected Thread _retryThread;
    protected volatile boolean _running;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pinot/broker/failuredetector/BaseExponentialBackoffRetryFailureDetector$RetryInfo.class */
    public class RetryInfo implements Delayed {
        final String _instanceId;
        long _retryTimeNs;
        long _retryDelayNs;
        int _numRetires = 0;

        RetryInfo(String str) {
            this._instanceId = str;
            this._retryTimeNs = System.nanoTime() + BaseExponentialBackoffRetryFailureDetector.this._retryInitialDelayNs;
            this._retryDelayNs = BaseExponentialBackoffRetryFailureDetector.this._retryInitialDelayNs;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this._retryTimeNs - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return Long.compare(this._retryTimeNs, ((RetryInfo) delayed)._retryTimeNs);
        }
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void init(PinotConfiguration pinotConfiguration, BrokerMetrics brokerMetrics) {
        this._brokerMetrics = brokerMetrics;
        long property = pinotConfiguration.getProperty(CommonConstants.Broker.FailureDetector.CONFIG_OF_RETRY_INITIAL_DELAY_MS, 5000L);
        this._retryInitialDelayNs = TimeUnit.MILLISECONDS.toNanos(property);
        this._retryDelayFactor = pinotConfiguration.getProperty(CommonConstants.Broker.FailureDetector.CONFIG_OF_RETRY_DELAY_FACTOR, 2.0d);
        this._maxRetries = pinotConfiguration.getProperty(CommonConstants.Broker.FailureDetector.CONFIG_OF_MAX_RETRIES, 10);
        LOGGER.info("Initialized {} with retry initial delay: {}ms, exponential backoff factor: {}, max retries: {}", this._name, Long.valueOf(property), Double.valueOf(this._retryDelayFactor), Integer.valueOf(this._maxRetries));
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void register(FailureDetector.Listener listener) {
        this._listeners.add(listener);
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void start() {
        LOGGER.info("Starting {}", this._name);
        this._running = true;
        this._retryThread = new Thread(() -> {
            while (this._running) {
                try {
                    RetryInfo take = this._retryInfoDelayQueue.take();
                    String str = take._instanceId;
                    if (this._unhealthyServerRetryInfoMap.get(str) != take) {
                        LOGGER.info("Server: {} has been marked healthy, skipping the retry", str);
                    } else if (take._numRetires == this._maxRetries) {
                        LOGGER.warn("Unhealthy server: {} already reaches the max retries: {}, do not retry again and treat it as healthy so that the listeners do not lose track of the server", str, Integer.valueOf(this._maxRetries));
                        markServerHealthy(str);
                    } else {
                        LOGGER.info("Retry unhealthy server: {}", str);
                        Iterator<FailureDetector.Listener> it2 = this._listeners.iterator();
                        while (it2.hasNext()) {
                            it2.next().retryUnhealthyServer(str, this);
                        }
                        take._retryDelayNs = (long) (take._retryDelayNs * this._retryDelayFactor);
                        take._retryTimeNs = System.nanoTime() + take._retryDelayNs;
                        take._numRetires++;
                        this._retryInfoDelayQueue.offer((DelayQueue<RetryInfo>) take);
                    }
                } catch (Exception e) {
                    if (this._running) {
                        LOGGER.error("Caught exception in the retry thread, continuing with errors", (Throwable) e);
                    }
                }
            }
        });
        this._retryThread.setName("failure-detector-retry");
        this._retryThread.setDaemon(true);
        this._retryThread.start();
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void markServerHealthy(String str) {
        this._unhealthyServerRetryInfoMap.computeIfPresent(str, (str2, retryInfo) -> {
            LOGGER.info("Mark server: {} as healthy", str);
            this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, this._unhealthyServerRetryInfoMap.size() - 1);
            Iterator<FailureDetector.Listener> it2 = this._listeners.iterator();
            while (it2.hasNext()) {
                it2.next().notifyHealthyServer(str, this);
            }
            return null;
        });
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void markServerUnhealthy(String str) {
        this._unhealthyServerRetryInfoMap.computeIfAbsent(str, str2 -> {
            LOGGER.warn("Mark server: {} as unhealthy", str);
            this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS, this._unhealthyServerRetryInfoMap.size() + 1);
            Iterator<FailureDetector.Listener> it2 = this._listeners.iterator();
            while (it2.hasNext()) {
                it2.next().notifyUnhealthyServer(str, this);
            }
            RetryInfo retryInfo = new RetryInfo(str2);
            this._retryInfoDelayQueue.offer((DelayQueue<RetryInfo>) retryInfo);
            return retryInfo;
        });
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public Set<String> getUnhealthyServers() {
        return this._unhealthyServerRetryInfoMap.keySet();
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector
    public void stop() {
        LOGGER.info("Stopping {}", this._name);
        this._running = false;
        try {
            this._retryThread.interrupt();
            this._retryThread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for retry thread to finish", e);
        }
    }
}
