package org.apache.pinot.broker.failuredetector;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.broker.failuredetector.FailureDetector;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.class */
public class ConnectionFailureDetectorTest {
    private static final String INSTANCE_ID = "Server_localhost_1234";
    private BrokerMetrics _brokerMetrics;
    private FailureDetector _failureDetector;
    private Listener _listener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest$Listener.class */
    public static class Listener implements FailureDetector.Listener {
        final AtomicInteger _notifyUnhealthyServerCalled = new AtomicInteger();
        final AtomicInteger _retryUnhealthyServerCalled = new AtomicInteger();
        final AtomicInteger _notifyHealthyServerCalled = new AtomicInteger();

        private Listener() {
        }

        public void notifyUnhealthyServer(String str, FailureDetector failureDetector) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._notifyUnhealthyServerCalled.getAndIncrement();
        }

        public void retryUnhealthyServer(String str, FailureDetector failureDetector) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._retryUnhealthyServerCalled.getAndIncrement();
        }

        public void notifyHealthyServer(String str, FailureDetector failureDetector) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._notifyHealthyServerCalled.getAndIncrement();
        }

        void reset() {
            this._notifyUnhealthyServerCalled.set(0);
            this._retryUnhealthyServerCalled.set(0);
            this._notifyHealthyServerCalled.set(0);
        }
    }

    @BeforeClass
    public void setUp() {
        PinotConfiguration pinotConfiguration = new PinotConfiguration();
        pinotConfiguration.setProperty("pinot.broker.failure.detector.type", CommonConstants.Broker.FailureDetector.Type.CONNECTION.name());
        pinotConfiguration.setProperty("pinot.broker.failure.detector.retry.initial.delay.ms", 100);
        pinotConfiguration.setProperty("pinot.broker.failure.detector.retry.delay.factor", 1);
        this._brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
        this._failureDetector = FailureDetectorFactory.getFailureDetector(pinotConfiguration, this._brokerMetrics);
        Assert.assertTrue(this._failureDetector instanceof ConnectionFailureDetector);
        this._listener = new Listener();
        this._failureDetector.register(this._listener);
        this._failureDetector.start();
    }

    @Test
    public void testConnectionFailure() {
        QueryResponse queryResponse = (QueryResponse) Mockito.mock(QueryResponse.class);
        Mockito.when(queryResponse.getFailedServer()).thenReturn(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE));
        this._failureDetector.notifyQuerySubmitted(queryResponse);
        verify(Collections.emptySet(), 0, 0);
        this._failureDetector.notifyQueryFinished(queryResponse);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        this._failureDetector.markServerHealthy(INSTANCE_ID);
        verify(Collections.emptySet(), 1, 1);
        this._listener.reset();
    }

    @Test
    public void testRetry() {
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        TestUtils.waitForCondition(r6 -> {
            int i = this._listener._retryUnhealthyServerCalled.get();
            if (i >= 10) {
                Assert.assertEquals(i, 10);
                return Boolean.valueOf(this._failureDetector.getUnhealthyServers().isEmpty() && this._brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS) == 0 && this._listener._notifyUnhealthyServerCalled.get() == 1 && this._listener._notifyHealthyServerCalled.get() == 1);
            }
            Assert.assertEquals(this._failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
            Assert.assertEquals(this._brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1L);
            return false;
        }, 5000L, "Failed to get 10 retires");
        this._listener.reset();
    }

    private void verify(Set<String> set, int i, int i2) {
        Assert.assertEquals(this._failureDetector.getUnhealthyServers(), set);
        Assert.assertEquals(this._brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), set.size());
        Assert.assertEquals(this._listener._notifyUnhealthyServerCalled.get(), i);
        Assert.assertEquals(this._listener._notifyHealthyServerCalled.get(), i2);
    }

    @AfterClass
    public void tearDown() {
        this._failureDetector.stop();
    }
}
