package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.class */
public class RealtimeConsumptionRateLimiterClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest {
    private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
    private static final double SERVER_RATE_LIMIT = 100.0d;
    private final boolean _isDirectAlloc = RANDOM.nextBoolean();
    private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
    private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
    private List<File> _avroFiles;
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumptionRateLimiterClusterIntegrationTest.class);
    private static final long RANDOM_SEED = System.currentTimeMillis();
    private static final Random RANDOM = new Random(RANDOM_SEED);

    protected String getLoadMode() {
        return ReadMode.mmap.name();
    }

    public void startController() throws Exception {
        super.startController();
        enableResourceConfigForLeadControllerResource(this._enableLeadControllerResource);
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.server.instance.realtime.alloc.offheap", true);
        pinotConfiguration.setProperty("pinot.server.instance.realtime.alloc.offheap.direct", Boolean.valueOf(this._isDirectAlloc));
        if (this._isConsumerDirConfigured) {
            pinotConfiguration.setProperty("pinot.server.instance.consumerDir", CONSUMER_DIRECTORY);
        }
        pinotConfiguration.setProperty("pinot.server.consumption.rate.limit", Double.valueOf(SERVER_RATE_LIMIT));
    }

    protected IngestionConfig getIngestionConfig() {
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
        return ingestionConfig;
    }

    protected long getCountStarResult() {
        return super.getCountStarResult() * 2;
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir});
        startZk();
        startController();
        startBroker();
        startServer();
        startKafka();
        this._avroFiles = unpackAvroData(this._tempDir);
        pushAvroIntoKafka(this._avroFiles);
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Test
    public void testOneTableRateLimit() throws Exception {
        String tableName = getTableName();
        try {
            addSchema(createSchema());
            long currentTimeMillis = System.currentTimeMillis();
            addTableConfig(createRealtimeTableConfig(this._avroFiles.get(0)));
            for (int i = 0; i < 60 && !isTableLoaded(tableName); i++) {
                Thread.sleep(1000L);
            }
            PinotMeter meteredValue = ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
            long currentCountStarResult = getCurrentCountStarResult(tableName);
            for (int i2 = 1; i2 <= 10; i2++) {
                Thread.sleep(1000L);
                long currentCountStarResult2 = getCurrentCountStarResult(tableName);
                double currentTimeMillis2 = ((currentCountStarResult2 - currentCountStarResult) / (System.currentTimeMillis() - currentTimeMillis)) * 1000.0d;
                LOGGER.info("Second = {}, realtimeRowConsumedMeter = {}, currentCount = {}, currentRate = {}", new Object[]{Integer.valueOf(i2), Double.valueOf(meteredValue.oneMinuteRate()), Long.valueOf(currentCountStarResult2), Double.valueOf(currentTimeMillis2)});
                Assert.assertTrue(meteredValue.oneMinuteRate() < SERVER_RATE_LIMIT, "Rate should be less than 100.0");
                Assert.assertTrue(currentTimeMillis2 < 150.0d, "Rate should be less than 100.0");
            }
        } finally {
            dropRealtimeTable(tableName);
            waitForEVToDisappear(TableNameBuilder.REALTIME.tableNameWithType(tableName));
        }
    }

    @Test
    public void testTwoTableRateLimit() throws Exception {
        try {
            Schema createSchema = createSchema();
            createSchema.setSchemaName(SimpleMinionClusterIntegrationTest.TABLE_NAME_1);
            addSchema(createSchema);
            Schema createSchema2 = createSchema();
            createSchema2.setSchemaName(SimpleMinionClusterIntegrationTest.TABLE_NAME_2);
            addSchema(createSchema2);
            long currentTimeMillis = System.currentTimeMillis();
            addTableConfig(createRealtimeTableConfig(SimpleMinionClusterIntegrationTest.TABLE_NAME_1));
            addTableConfig(createRealtimeTableConfig(SimpleMinionClusterIntegrationTest.TABLE_NAME_2));
            for (int i = 0; i < 60 && (!isTableLoaded(SimpleMinionClusterIntegrationTest.TABLE_NAME_1) || !isTableLoaded(SimpleMinionClusterIntegrationTest.TABLE_NAME_2)); i++) {
                Thread.sleep(1000L);
            }
            PinotMeter meteredValue = ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
            long currentCountStarResult = getCurrentCountStarResult(SimpleMinionClusterIntegrationTest.TABLE_NAME_1);
            long currentCountStarResult2 = getCurrentCountStarResult(SimpleMinionClusterIntegrationTest.TABLE_NAME_2);
            for (int i2 = 1; i2 <= 10; i2++) {
                Thread.sleep(1000L);
                long currentCountStarResult3 = getCurrentCountStarResult(SimpleMinionClusterIntegrationTest.TABLE_NAME_1);
                long currentCountStarResult4 = getCurrentCountStarResult(SimpleMinionClusterIntegrationTest.TABLE_NAME_2);
                long j = currentCountStarResult3 + currentCountStarResult4;
                long currentTimeMillis2 = System.currentTimeMillis();
                double d = ((currentCountStarResult3 - currentCountStarResult) / (currentTimeMillis2 - currentTimeMillis)) * 1000.0d;
                double d2 = ((currentCountStarResult4 - currentCountStarResult2) / (currentTimeMillis2 - currentTimeMillis)) * 1000.0d;
                double d3 = d + d2;
                LOGGER.info("Second = {}, serverRowConsumedMeter = {}, currentCount1 = {}, currentRate1 = {}, currentCount2 = {}, currentRate2 = {}, currentServerCount = {}, currentServerRate = {}", new Object[]{Integer.valueOf(i2), Double.valueOf(meteredValue.oneMinuteRate()), Long.valueOf(currentCountStarResult3), Double.valueOf(d), Long.valueOf(currentCountStarResult4), Double.valueOf(d2), Long.valueOf(j), Double.valueOf(d3)});
                Assert.assertTrue(meteredValue.oneMinuteRate() < SERVER_RATE_LIMIT, "Rate should be less than 100.0, serverOneMinuteRate = " + meteredValue.oneMinuteRate());
                boolean z = d3 < 150.0d;
                Assert.assertTrue(z, "Whole table ingestion rate should be less than 100.0, currentRate1 = " + d + ", currentRate2 = " + z + ", currentServerRate = " + d2);
            }
        } finally {
            dropRealtimeTable(SimpleMinionClusterIntegrationTest.TABLE_NAME_1);
            dropRealtimeTable(SimpleMinionClusterIntegrationTest.TABLE_NAME_2);
            waitForEVToDisappear(TableNameBuilder.REALTIME.tableNameWithType(SimpleMinionClusterIntegrationTest.TABLE_NAME_1));
            waitForEVToDisappear(TableNameBuilder.REALTIME.tableNameWithType(SimpleMinionClusterIntegrationTest.TABLE_NAME_2));
        }
    }

    protected TableConfig createRealtimeTableConfig() {
        return createRealtimeTableConfig(getTableName());
    }

    protected TableConfig createRealtimeTableConfig(String str) {
        return new TableConfigBuilder(TableType.REALTIME).setTableName(str).setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
    }

    private boolean isTableLoaded(String str) {
        try {
            return getCurrentCountStarResult(str) > 0;
        } catch (Exception e) {
            return false;
        }
    }

    protected Map<String, String> getStreamConfigs() {
        return null;
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testDictionaryBasedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testGeneratedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testHardcodedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testInstanceShutdown() {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testQueriesFromQueryFile(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testQueryExceptions(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testHardcodedServerPartitionedSqlQueries() {
    }
}
