package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.client.BrokerResponse;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.PinotClientException;
import org.apache.pinot.client.PinotClientTransport;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.class */
public class QueryQuotaClusterIntegrationTest extends BaseClusterIntegrationTest {
    private PinotClientTransport _pinotClientTransport;
    private String _brokerHostPort;
    private static volatile double _quota;
    private static volatile String _quotaSource;

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBrokers(1);
        startServers(1);
        this._brokerHostPort = "localhost:" + String.valueOf(this._brokerPorts.get(0));
        addSchema(createSchema());
        addTableConfig(createOfflineTableConfig());
        Properties properties = new Properties();
        properties.put("failOnExceptions", "FALSE");
        this._pinotClientTransport = new JsonAsyncHttpPinotClientTransportFactory().withConnectionProperties(getPinotConnectionProperties()).buildTransport();
        this._pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName(), this._pinotClientTransport);
        setQueryQuotaForApplication(null);
    }

    @AfterMethod
    void resetQuotas() throws Exception {
        addQueryQuotaToClusterConfig(null);
        addAppQueryQuotaToClusterConfig(null);
        setQueryQuotaForApplication(null);
        addQueryQuotaToDatabaseConfig(null);
        addQueryQuotaToTableConfig(null);
        this._brokerHostPort = "localhost:" + String.valueOf(this._brokerPorts.get(0));
        verifyQuotaUpdate(0.0d);
    }

    @Test
    public void testDefaultDatabaseQueryQuota() throws Exception {
        addQueryQuotaToClusterConfig(40);
        testQueryRate(40);
    }

    @Test
    public void testDefaultApplicationQueryQuota() throws Exception {
        addAppQueryQuotaToClusterConfig(50);
        testQueryRate(50);
    }

    @Test
    public void testDatabaseConfigQueryQuota() throws Exception {
        addQueryQuotaToDatabaseConfig(10);
        testQueryRate(10);
    }

    @Test
    public void testApplicationQueryQuota() throws Exception {
        setQueryQuotaForApplication(15);
        testQueryRate(15);
    }

    @Test
    public void testDefaultDatabaseQueryQuotaOverride() throws Exception {
        addQueryQuotaToClusterConfig(25);
        addQueryQuotaToDatabaseConfig(10);
        testQueryRate(10);
        addQueryQuotaToDatabaseConfig(40);
        testQueryRate(40);
    }

    @Test
    public void testDefaultApplicationQueryQuotaOverride() throws Exception {
        addAppQueryQuotaToClusterConfig(25);
        setQueryQuotaForApplication(10);
        testQueryRate(10);
        setQueryQuotaForApplication(27);
        testQueryRate(27);
        setQueryQuotaForApplication(-1);
        verifyQuotaUpdate(9.223372036854776E18d);
        runQueries(50, false);
        runQueries(20, false, "other");
        runQueries(50, true, "other");
    }

    @Test
    public void testDisabledDefaultApplicationQueryQuotaOverride() throws Exception {
        addAppQueryQuotaToClusterConfig(-1);
        verifyQuotaUpdate(9.223372036854776E18d);
        runQueries(10, false);
        setQueryQuotaForApplication(40);
        testQueryRate(40);
    }

    @Test
    public void testDatabaseQueryQuotaWithTableQueryQuota() throws Exception {
        addQueryQuotaToDatabaseConfig(25);
        addQueryQuotaToTableConfig(10);
        testQueryRate(10);
        addQueryQuotaToTableConfig(50);
        testQueryRate(25);
    }

    @Test
    public void testApplicationQueryQuotaWithTableQueryQuota() throws Exception {
        setQueryQuotaForApplication(25);
        addQueryQuotaToTableConfig(10);
        testQueryRate(10);
        addQueryQuotaToTableConfig(50);
        testQueryRate(25);
    }

    @Test
    public void testDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker() throws Exception {
        BaseBrokerStarter baseBrokerStarter = null;
        try {
            addQueryQuotaToDatabaseConfig(25);
            addQueryQuotaToTableConfig(10);
            baseBrokerStarter = startOneBroker(2);
            this._brokerHostPort = "localhost:" + baseBrokerStarter.getPort();
            testQueryRateOnBroker(5.0f);
            addQueryQuotaToTableConfig(null);
            testQueryRateOnBroker(12.5f);
            if (baseBrokerStarter != null) {
                baseBrokerStarter.stop();
            }
        } catch (Throwable th) {
            if (baseBrokerStarter != null) {
                baseBrokerStarter.stop();
            }
            throw th;
        }
    }

    @Test
    public void testApplicationAndDatabaseQueryQuotaWithTableQueryQuotaWithExtraBroker() throws Exception {
        BaseBrokerStarter baseBrokerStarter = null;
        try {
            addAppQueryQuotaToClusterConfig(null);
            addQueryQuotaToClusterConfig(null);
            setQueryQuotaForApplication(50);
            addQueryQuotaToDatabaseConfig(25);
            addQueryQuotaToTableConfig(10);
            baseBrokerStarter = startOneBroker(2);
            this._brokerHostPort = "localhost:" + baseBrokerStarter.getPort();
            testQueryRateOnBroker(5.0f);
            addQueryQuotaToTableConfig(null);
            testQueryRateOnBroker(12.5f);
            addQueryQuotaToDatabaseConfig(null);
            testQueryRateOnBroker(25.0f);
            if (baseBrokerStarter != null) {
                baseBrokerStarter.stop();
            }
        } catch (Throwable th) {
            if (baseBrokerStarter != null) {
                baseBrokerStarter.stop();
            }
            throw th;
        }
    }

    void testQueryRate(int i) {
        verifyQuotaUpdate(i);
        runQueries(i, false);
        runQueries(i * 2, true);
    }

    void testQueryRateOnBroker(float f) {
        verifyQuotaUpdate(f);
        runQueriesOnBroker(f, false);
        runQueriesOnBroker(f * 2.0f, true);
    }

    private static void sleep(long j, double d) {
        long currentTimeMillis = System.currentTimeMillis();
        long max = currentTimeMillis + ((long) Math.max(Math.ceil((j - currentTimeMillis) / d), 0.0d));
        while (currentTimeMillis < max) {
            try {
                Thread.sleep(max - currentTimeMillis);
                currentTimeMillis = System.currentTimeMillis();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void runQueries(int i, boolean z) {
        runQueries(i, z, "default");
    }

    private void runQueries(int i, boolean z, String str) {
        int i2 = 0;
        boolean z2 = false;
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        String str2 = "SET applicationName='" + str + "'; SELECT COUNT(*) FROM " + getTableName();
        int i3 = 0;
        while (i3 < i) {
            sleep(currentTimeMillis, i - i3);
            Iterator it = this._pinotConnection.execute(str2).getExceptions().iterator();
            while (true) {
                if (it.hasNext()) {
                    try {
                        if (JsonUtils.stringToJsonNode(((PinotClientException) it.next()).getMessage()).get("errorCode").intValue() == QueryErrorCode.TOO_MANY_REQUESTS.getId()) {
                            i2++;
                            z2 = i3 == i - 1;
                        }
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
            }
            i3++;
        }
        if (z) {
            Assert.assertNotEquals(Integer.valueOf(i2), 0, "Expected nonzero failures for qps: " + i + " isLastFail: " + z2);
        } else {
            Assert.assertEquals(i2, 0, "Expected zero failures for qps: " + i + " isLastFail: " + z2);
        }
    }

    private void verifyQuotaUpdate(double d) {
        try {
            TestUtils.waitForCondition(r8 -> {
                try {
                    double parseDouble = Double.parseDouble(sendGetRequest("http://" + this._brokerHostPort + "/debug/tables/queryQuota/" + getTableName() + "_OFFLINE"));
                    double parseDouble2 = Double.parseDouble(sendGetRequest("http://" + this._brokerHostPort + "/debug/databases/queryQuota/default"));
                    double parseDouble3 = Double.parseDouble(sendGetRequest("http://" + this._brokerHostPort + "/debug/applicationQuotas/default"));
                    double d2 = parseDouble == 0.0d ? 9.223372036854776E18d : parseDouble;
                    double d3 = parseDouble2 == 0.0d ? 9.223372036854776E18d : parseDouble2;
                    double d4 = parseDouble3 == 0.0d ? 9.223372036854776E18d : parseDouble3;
                    double min = Math.min(Math.min(d2, d3), d4);
                    _quota = min;
                    if (_quota == d3) {
                        _quotaSource = "database";
                    } else if (_quota == d2) {
                        _quotaSource = "table";
                    } else {
                        _quotaSource = "application";
                    }
                    return Boolean.valueOf(Math.abs(d - min) < 0.01d || (d == 0.0d && d2 == 9.223372036854776E18d && d3 == 9.223372036854776E18d && d4 == 9.223372036854776E18d));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }, 10000L, "Failed to reflect query quota on rate limiter in 5s.");
        } catch (AssertionError e) {
            String message = e.getMessage();
            double d2 = _quota;
            String str = _quotaSource;
            AssertionError assertionError = new AssertionError(message + " Expected quota:" + d + " but is: " + assertionError + " set on: " + d2, e);
            throw assertionError;
        }
    }

    private BrokerResponse executeQueryOnBroker(String str) {
        return this._pinotClientTransport.executeQuery(this._brokerHostPort, str);
    }

    private void runQueriesOnBroker(float f, boolean z) {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i2 = 0; i2 < f; i2++) {
            sleep(currentTimeMillis, f - i2);
            Iterator elements = executeQueryOnBroker("SET applicationName='default'; SELECT COUNT(*) FROM " + getTableName()).getExceptions().elements();
            while (true) {
                if (elements.hasNext()) {
                    if (((JsonNode) elements.next()).get("errorCode").asInt() == QueryErrorCode.TOO_MANY_REQUESTS.getId()) {
                        i++;
                        break;
                    }
                } else {
                    break;
                }
            }
        }
        if (z) {
            Assert.assertTrue(i != 0, "Expected nonzero failures for qps: " + f);
        } else {
            Assert.assertEquals(i, 0, "Expected 0 failures for qps: " + f);
        }
    }

    public void addQueryQuotaToTableConfig(Integer num) throws Exception {
        TableConfig offlineTableConfig = getOfflineTableConfig();
        offlineTableConfig.setQuotaConfig(new QuotaConfig((String) null, num == null ? null : num.toString()));
        updateTableConfig(offlineTableConfig);
    }

    public void addQueryQuotaToDatabaseConfig(Integer num) throws Exception {
        String str = this._controllerRequestURLBuilder.getBaseUrl() + "/databases/default/quotas";
        if (num != null) {
            str = str + "?maxQueriesPerSecond=" + num;
        }
        HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new URI(str), (HttpEntity) null, (Map) null));
    }

    public void setQueryQuotaForApplication(Integer num) throws Exception {
        String str = this._controllerRequestURLBuilder.getBaseUrl() + "/applicationQuotas/default";
        if (num != null) {
            str = str + "?maxQueriesPerSecond=" + num;
        }
        HttpClient.wrapAndThrowHttpException(_httpClient.sendPostRequest(new URI(str), (HttpEntity) null, (Map) null));
    }

    public void addQueryQuotaToClusterConfig(Integer num) throws Exception {
        if (num == null) {
            HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new URI(this._controllerRequestURLBuilder.forClusterConfigs() + "/databaseMaxQueriesPerSecond")));
            return;
        }
        HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URI(this._controllerRequestURLBuilder.forClusterConfigs()), "{\"databaseMaxQueriesPerSecond\":\"" + num + "\"}"));
    }

    public void addAppQueryQuotaToClusterConfig(Integer num) throws Exception {
        if (num == null) {
            HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(new URI(this._controllerRequestURLBuilder.forClusterConfigs() + "/applicationMaxQueriesPerSecond")));
            return;
        }
        HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URI(this._controllerRequestURLBuilder.forClusterConfigs()), "{\"applicationMaxQueriesPerSecond\":\"" + num + "\"}"));
    }
}
