package org.apache.pinot.common.failuredetector;

import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.pinot.common.failuredetector.FailureDetector;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.MetricValueUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.class */
public class ConnectionFailureDetectorTest {
    private static final String INSTANCE_ID = "Server_localhost_1234";
    private BrokerMetrics _brokerMetrics;
    private FailureDetector _failureDetector;
    private UnhealthyServerRetrier _unhealthyServerRetrier;
    private HealthyServerNotifier _healthyServerNotifier;
    private UnhealthyServerNotifier _unhealthyServerNotifier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest$HealthyServerNotifier.class */
    public static class HealthyServerNotifier implements Consumer<String> {
        int _notifyHealthyServerCalled = 0;

        private HealthyServerNotifier() {
        }

        @Override // java.util.function.Consumer
        public void accept(String str) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._notifyHealthyServerCalled++;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest$UnhealthyServerNotifier.class */
    public static class UnhealthyServerNotifier implements Consumer<String> {
        int _notifyUnhealthyServerCalled = 0;

        private UnhealthyServerNotifier() {
        }

        @Override // java.util.function.Consumer
        public void accept(String str) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._notifyUnhealthyServerCalled++;
        }
    }

    /* loaded from: input_file:org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest$UnhealthyServerRetrier.class */
    private static class UnhealthyServerRetrier implements Function<String, FailureDetector.ServerState> {
        int _retryUnhealthyServerCalled = 0;
        final int _numFailures;

        UnhealthyServerRetrier(int i) {
            this._numFailures = i;
        }

        @Override // java.util.function.Function
        public FailureDetector.ServerState apply(String str) {
            Assert.assertEquals(str, ConnectionFailureDetectorTest.INSTANCE_ID);
            this._retryUnhealthyServerCalled++;
            return this._retryUnhealthyServerCalled > this._numFailures ? FailureDetector.ServerState.HEALTHY : FailureDetector.ServerState.UNHEALTHY;
        }
    }

    @BeforeMethod
    public void setUp() {
        PinotConfiguration pinotConfiguration = new PinotConfiguration();
        pinotConfiguration.setProperty("pinot.broker.failure.detector.type", CommonConstants.Broker.FailureDetector.Type.CONNECTION.name());
        pinotConfiguration.setProperty("pinot.broker.failure.detector.retry.initial.delay.ms", 100);
        pinotConfiguration.setProperty("pinot.broker.failure.detector.retry.delay.factor", 1);
        this._brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
        this._failureDetector = FailureDetectorFactory.getFailureDetector(pinotConfiguration, this._brokerMetrics);
        Assert.assertTrue(this._failureDetector instanceof ConnectionFailureDetector);
        this._healthyServerNotifier = new HealthyServerNotifier();
        this._failureDetector.registerHealthyServerNotifier(this._healthyServerNotifier);
        this._unhealthyServerNotifier = new UnhealthyServerNotifier();
        this._failureDetector.registerUnhealthyServerNotifier(this._unhealthyServerNotifier);
        this._failureDetector.start();
    }

    @Test
    public void testConnectionFailure() {
        verify(Collections.emptySet(), 0, 0);
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        this._failureDetector.markServerHealthy(INSTANCE_ID);
        verify(Collections.emptySet(), 1, 1);
    }

    @Test
    public void testRetryWithoutRecovery() {
        this._unhealthyServerRetrier = new UnhealthyServerRetrier(10);
        this._failureDetector.registerUnhealthyServerRetrier(this._unhealthyServerRetrier);
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        TestUtils.waitForCondition(r6 -> {
            int i = this._unhealthyServerRetrier._retryUnhealthyServerCalled;
            if (i >= 10) {
                Assert.assertEquals(i, 10);
                return Boolean.valueOf(this._failureDetector.getUnhealthyServers().isEmpty() && MetricValueUtils.getGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0 && this._unhealthyServerNotifier._notifyUnhealthyServerCalled == 1 && this._healthyServerNotifier._notifyHealthyServerCalled == 1);
            }
            Assert.assertEquals(this._failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
            Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), 1L);
            return false;
        }, 5000L, "Failed to get 10 retries");
    }

    @Test
    public void testRetryWithRecovery() {
        this._unhealthyServerRetrier = new UnhealthyServerRetrier(6);
        this._failureDetector.registerUnhealthyServerRetrier(this._unhealthyServerRetrier);
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        TestUtils.waitForCondition(r6 -> {
            int i = this._unhealthyServerRetrier._retryUnhealthyServerCalled;
            if (i >= 7) {
                Assert.assertEquals(i, 7);
                return Boolean.valueOf(this._failureDetector.getUnhealthyServers().isEmpty() && MetricValueUtils.getGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0 && this._unhealthyServerNotifier._notifyUnhealthyServerCalled == 1 && this._healthyServerNotifier._notifyHealthyServerCalled == 1);
            }
            if (i > 0 && i <= 5) {
                Assert.assertEquals(this._failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
                Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), 1L);
            }
            return false;
        }, 5000L, "Failed to get 7 retries");
        Assert.assertEquals(this._unhealthyServerRetrier._retryUnhealthyServerCalled, 7);
    }

    @Test
    public void testRetryWithMultipleUnhealthyServerRetriers() {
        this._unhealthyServerRetrier = new UnhealthyServerRetrier(5);
        this._failureDetector.registerUnhealthyServerRetrier(this._unhealthyServerRetrier);
        this._failureDetector.registerUnhealthyServerRetrier(new UnhealthyServerRetrier(2));
        this._failureDetector.registerUnhealthyServerRetrier(str -> {
            return FailureDetector.ServerState.UNKNOWN;
        });
        this._failureDetector.markServerUnhealthy(INSTANCE_ID);
        verify(Collections.singleton(INSTANCE_ID), 1, 0);
        TestUtils.waitForCondition(r6 -> {
            int i = this._unhealthyServerRetrier._retryUnhealthyServerCalled;
            if (i >= 8) {
                Assert.assertEquals(i, 8);
                return Boolean.valueOf(this._failureDetector.getUnhealthyServers().isEmpty() && MetricValueUtils.getGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0 && this._unhealthyServerNotifier._notifyUnhealthyServerCalled == 1 && this._healthyServerNotifier._notifyHealthyServerCalled == 1);
            }
            if (i > 0 && i <= 5) {
                Assert.assertEquals(this._failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID));
                Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), 1L);
            }
            return false;
        }, 5000L, "Failed to get 8 retries");
        Assert.assertEquals(this._unhealthyServerRetrier._retryUnhealthyServerCalled, 8);
    }

    private void verify(Set<String> set, int i, int i2) {
        Assert.assertEquals(this._failureDetector.getUnhealthyServers(), set);
        Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(this._brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), set.size());
        Assert.assertEquals(this._unhealthyServerNotifier._notifyUnhealthyServerCalled, i);
        Assert.assertEquals(this._healthyServerNotifier._notifyHealthyServerCalled, i2);
    }

    @AfterClass
    public void tearDown() {
        this._failureDetector.stop();
    }
}
