package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.class */
public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest {
    private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
    private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime");
    private static final long RANDOM_SEED = System.currentTimeMillis();
    private static final Random RANDOM = new Random(RANDOM_SEED);
    private final boolean _isDirectAlloc = RANDOM.nextBoolean();
    private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
    private final boolean _enableSplitCommit = RANDOM.nextBoolean();
    private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
    private final long _startTime = System.currentTimeMillis();

    protected boolean injectTombstones() {
        return true;
    }

    protected String getLoadMode() {
        return ReadMode.mmap.name();
    }

    public void startController() throws Exception {
        Map defaultControllerConfiguration = getDefaultControllerConfiguration();
        defaultControllerConfiguration.put("controller.allow.hlc.tables", false);
        defaultControllerConfiguration.put("controller.enable.split.commit", Boolean.valueOf(this._enableSplitCommit));
        startController(defaultControllerConfiguration);
        enableResourceConfigForLeadControllerResource(this._enableLeadControllerResource);
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.server.instance.realtime.alloc.offheap", true);
        pinotConfiguration.setProperty("pinot.server.instance.realtime.alloc.offheap.direct", Boolean.valueOf(this._isDirectAlloc));
        if (this._isConsumerDirConfigured) {
            pinotConfiguration.setProperty("pinot.server.instance.consumerDir", CONSUMER_DIRECTORY);
        }
        if (this._enableSplitCommit) {
            pinotConfiguration.setProperty("pinot.server.instance.enable.split.commit", true);
            pinotConfiguration.setProperty("pinot.server.instance.enable.commitend.metadata", true);
        }
    }

    protected IngestionConfig getIngestionConfig() {
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
        return ingestionConfig;
    }

    protected Map<String, String> getStreamConfigs() {
        return null;
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    protected void createSegmentsAndUpload(List<File> list, Schema schema, TableConfig tableConfig) throws Exception {
        if (!this._tarDir.exists()) {
            this._tarDir.mkdir();
        }
        if (!this._segmentDir.exists()) {
            this._segmentDir.mkdir();
        }
        ArrayList arrayList = new ArrayList(list);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(arrayList, tableConfig, schema, 0, this._segmentDir, this._tarDir);
        uploadSegmentsToController(getTableName(), this._tarDir, false, false);
        uploadSegmentsToController(getTableName(), this._tarDir, true, false);
        uploadSegmentsToController(getTableName(), this._tarDir, true, true);
        list.addAll(arrayList);
    }

    private void uploadSegmentsToController(String str, File file, boolean z, boolean z2) throws Exception {
        File[] listFiles = file.listFiles();
        Assert.assertNotNull(listFiles);
        int length = listFiles.length;
        Assert.assertTrue(length > 0);
        if (z) {
            length = 1;
        }
        URI create = URI.create(getControllerRequestURLBuilder().forSegmentUpload());
        FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient();
        try {
            if (length == 1) {
                File file2 = listFiles[0];
                if (z2) {
                    changeCrcInSegmentZKMetadata(str, file2.toString());
                }
                Assert.assertEquals(fileUploadDownloadClient.uploadSegment(create, file2.getName(), file2, str, TableType.REALTIME).getStatusCode(), 200);
            } else {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(length);
                ArrayList arrayList = new ArrayList(length);
                for (File file3 : listFiles) {
                    arrayList.add(newFixedThreadPool.submit(() -> {
                        return Integer.valueOf(fileUploadDownloadClient.uploadSegment(create, file3.getName(), file3, str, TableType.REALTIME).getStatusCode());
                    }));
                }
                newFixedThreadPool.shutdown();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(((Integer) ((Future) it.next()).get()).intValue(), 200);
                }
            }
            fileUploadDownloadClient.close();
        } catch (Throwable th) {
            try {
                fileUploadDownloadClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void changeCrcInSegmentZKMetadata(String str, String str2) {
        String substring = str2.substring(str2.indexOf("mytable_"), str2.indexOf(".tar.gz"));
        String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str);
        SegmentZKMetadata segmentZKMetadata = this._helixResourceManager.getSegmentZKMetadata(tableNameWithType, substring);
        segmentZKMetadata.setCrc(111L);
        this._helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata);
    }

    protected long getCountStarResult() {
        return super.getCountStarResult() * 2;
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        System.out.println(String.format("Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s", Long.valueOf(RANDOM_SEED), Boolean.valueOf(this._isDirectAlloc), Boolean.valueOf(this._isConsumerDirConfigured), Boolean.valueOf(this._enableSplitCommit), Boolean.valueOf(this._enableLeadControllerResource)));
        FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
        super.setUp();
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
        super.tearDown();
    }

    @Test
    public void testConsumerDirectoryExists() {
        Assert.assertEquals(new File(CONSUMER_DIRECTORY, "mytable_REALTIME").exists(), this._isConsumerDirConfigured, "The off heap consumer directory does not exist");
    }

    @Test
    public void testSegmentFlushSize() {
        for (SegmentZKMetadata segmentZKMetadata : ZKMetadataProvider.getSegmentsZKMetadata(this._propertyStore, TableNameBuilder.REALTIME.tableNameWithType(getTableName()))) {
            if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.UPLOADED) {
                Assert.assertEquals(segmentZKMetadata.getSizeThresholdToFlushSegment(), getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
            }
        }
    }

    @Test(expectedExceptions = {IOException.class})
    public void testAddHLCTableShouldFail() throws IOException {
        sendPostRequest(this._controllerRequestURLBuilder.forTableCreate(), new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build().toJsonString());
    }

    @Test
    public void testReload() throws Exception {
        testReload(false);
    }

    @Test(dataProvider = "useBothQueryEngines")
    public void testAddRemoveDictionaryAndInvertedIndex(boolean z) throws Exception {
        setUseMultiStageQueryEngine(z);
        notSupportedInV2();
        String str = "SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = -9999";
        long countStarResult = getCountStarResult();
        JsonNode postQuery = postQuery("SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = -9999");
        Assert.assertEquals(postQuery.get("totalDocs").asLong(), countStarResult);
        Assert.assertEquals(postQuery.get("numEntriesScannedInFilter").asLong(), countStarResult);
        long asLong = postQuery.get("resultTable").get("rows").get(0).get(0).asLong();
        TableConfig realtimeTableConfig = getRealtimeTableConfig();
        IndexingConfig indexingConfig = realtimeTableConfig.getIndexingConfig();
        Assert.assertNotNull(indexingConfig.getNoDictionaryColumns());
        Assert.assertNotNull(indexingConfig.getInvertedIndexColumns());
        indexingConfig.getNoDictionaryColumns().remove("ActualElapsedTime");
        indexingConfig.getInvertedIndexColumns().add("ActualElapsedTime");
        updateTableConfig(realtimeTableConfig);
        String reloadTableAndValidateResponse = reloadTableAndValidateResponse(getTableName(), TableType.REALTIME, false);
        TestUtils.waitForCondition(r12 -> {
            try {
                JsonNode postQuery2 = postQuery(str);
                Assert.assertEquals(postQuery2.get("resultTable").get("rows").get(0).get(0).asLong(), asLong);
                Assert.assertEquals(postQuery2.get("totalDocs").asLong(), countStarResult);
                long asLong2 = postQuery2.get("numConsumingSegmentsQueried").asLong();
                long asLong3 = postQuery2.get("minConsumingFreshnessTimeMs").asLong();
                return Boolean.valueOf(asLong2 == 2 && asLong3 > this._startTime && asLong3 < System.currentTimeMillis() && isReloadJobCompleted(reloadTableAndValidateResponse));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, 600000L, "Failed to generate dictionary and inverted index");
        Assert.assertEquals(postQuery("SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = -9999").get("numEntriesScannedInFilter").asLong(), 0L);
        indexingConfig.getNoDictionaryColumns().add("ActualElapsedTime");
        indexingConfig.getInvertedIndexColumns().remove("ActualElapsedTime");
        updateTableConfig(realtimeTableConfig);
        String reloadTableAndValidateResponse2 = reloadTableAndValidateResponse(getTableName(), TableType.REALTIME, false);
        TestUtils.waitForCondition(r122 -> {
            try {
                JsonNode postQuery2 = postQuery(str);
                Assert.assertEquals(postQuery2.get("resultTable").get("rows").get(0).get(0).asLong(), asLong);
                Assert.assertEquals(postQuery2.get("totalDocs").asLong(), countStarResult);
                return Boolean.valueOf(isReloadJobCompleted(reloadTableAndValidateResponse2));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to remove dictionary and inverted index");
        Assert.assertEquals(postQuery("SELECT COUNT(*) FROM myTable WHERE ActualElapsedTime = -9999").get("numEntriesScannedInFilter").asLong(), countStarResult);
    }

    @Test
    public void testReset() throws Exception {
        super.testReset(TableType.REALTIME);
    }

    @Test
    public void testForceCommit() throws Exception {
        Set<String> consumingSegmentsFromIdealState = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME");
        String forceCommit = forceCommit(getTableName());
        TestUtils.waitForCondition(r7 -> {
            try {
                if (!isForceCommitJobCompleted(forceCommit)) {
                    return false;
                }
                Assert.assertTrue(this._controllerStarter.getHelixResourceManager().getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false).containsAll(consumingSegmentsFromIdealState));
                return true;
            } catch (Exception e) {
                return false;
            }
        }, 60000L, "Error verifying force commit operation on table!");
    }

    public Set<String> getConsumingSegmentsFromIdealState(String str) {
        Map mapFields = this._controllerStarter.getHelixResourceManager().getTableIdealState(str).getRecord().getMapFields();
        HashSet hashSet = new HashSet(HashUtil.getHashMapCapacity(mapFields.size()));
        for (Map.Entry entry : mapFields.entrySet()) {
            if (((Map) entry.getValue()).containsValue("CONSUMING")) {
                hashSet.add((String) entry.getKey());
            }
        }
        return hashSet;
    }

    public boolean isForceCommitJobCompleted(String str) throws Exception {
        JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forForceCommitJobStatus(str)));
        Assert.assertEquals(stringToJsonNode.get("jobId").asText(), str);
        Assert.assertEquals(stringToJsonNode.get("jobType").asText(), "FORCE_COMMIT");
        return stringToJsonNode.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0;
    }

    private String forceCommit(String str) throws Exception {
        return JsonUtils.stringToJsonNode(sendPostRequest(this._controllerRequestURLBuilder.forTableForceCommit(str), null)).get("forceCommitJobId").asText();
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test
    public void testHardcodedServerPartitionedSqlQueries() throws Exception {
        super.testHardcodedServerPartitionedSqlQueries();
    }
}
