package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.MetricValueUtils;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.SqlResultComparator;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.integration.tests.ClusterTest;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.class */
public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
    private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1";
    private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2";
    private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3";
    private static final String SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE = "myTable4";
    private static final String SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE = "myTable5";
    private static final String MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE = "myTable6";
    private static final String PROCESS_ALL_MODE_KAFKA_TOPIC = "myKafkaTopic";
    private static final long TIMEOUT_IN_MS = 10000;
    protected PinotHelixTaskResourceManager _helixTaskResourceManager;
    protected PinotTaskManager _taskManager;
    protected PinotHelixResourceManager _pinotHelixResourceManager;
    protected final File _segmentDir1 = new File(this._tempDir, "segmentDir1");
    protected final File _segmentDir2 = new File(this._tempDir, "segmentDir2");
    protected final File _segmentDir3 = new File(this._tempDir, "segmentDir3");
    protected final File _segmentDir4 = new File(this._tempDir, "segmentDir4");
    protected final File _segmentDir5 = new File(this._tempDir, "segmentDir5");
    protected final File _tarDir1 = new File(this._tempDir, "tarDir1");
    protected final File _tarDir2 = new File(this._tempDir, "tarDir2");
    protected final File _tarDir3 = new File(this._tempDir, "tarDir3");
    protected final File _tarDir4 = new File(this._tempDir, "tarDir4");
    protected final File _tarDir5 = new File(this._tempDir, "tarDir5");

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir1, this._segmentDir2, this._segmentDir3, this._segmentDir4, this._segmentDir5, this._tarDir1, this._tarDir2, this._tarDir3, this._tarDir4, this._tarDir5});
        startZk();
        startController();
        startBroker();
        startServer();
        startMinion();
        startKafka();
        Schema createSchema = createSchema();
        addSchema(createSchema);
        createSchema.setSchemaName(SINGLE_LEVEL_CONCAT_TEST_TABLE);
        addSchema(createSchema);
        createSchema.setSchemaName(SINGLE_LEVEL_ROLLUP_TEST_TABLE);
        addSchema(createSchema);
        createSchema.setSchemaName(MULTI_LEVEL_CONCAT_TEST_TABLE);
        addSchema(createSchema);
        createSchema.setSchemaName(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig(SINGLE_LEVEL_CONCAT_TEST_TABLE, getSingleLevelConcatTaskConfig());
        TableConfig createOfflineTableConfig2 = createOfflineTableConfig(SINGLE_LEVEL_ROLLUP_TEST_TABLE, getSingleLevelRollupTaskConfig(), getMultiColumnsSegmentPartitionConfig());
        TableConfig createOfflineTableConfig3 = createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig());
        TableConfig createOfflineTableConfig4 = createOfflineTableConfig(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, getSingleLevelConcatMetadataTaskConfig());
        addTableConfig(createOfflineTableConfig);
        addTableConfig(createOfflineTableConfig2);
        addTableConfig(createOfflineTableConfig3);
        addTableConfig(createOfflineTableConfig4);
        List unpackAvroData = unpackAvroData(this._tempDir);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createOfflineTableConfig, createSchema, 0, this._segmentDir1, this._tarDir1);
        buildSegmentsFromAvroWithPostfix(unpackAvroData, createOfflineTableConfig2, createSchema, 0, this._segmentDir2, this._tarDir2, "1");
        buildSegmentsFromAvroWithPostfix(unpackAvroData, createOfflineTableConfig2, createSchema, 0, this._segmentDir2, this._tarDir2, "2");
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createOfflineTableConfig3, createSchema, 0, this._segmentDir3, this._tarDir3);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createOfflineTableConfig4, createSchema, 0, this._segmentDir4, this._tarDir4);
        uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, this._tarDir1);
        uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, this._tarDir2);
        uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, this._tarDir3);
        uploadSegments(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, this._tarDir4);
        addTableConfig(createRealtimeTableConfig((File) unpackAvroData.get(0)));
        ((StreamDataServerStartable) this._kafkaStarters.get(0)).createTopic(PROCESS_ALL_MODE_KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(getNumKafkaPartitions()));
        createSchema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
        addSchema(createSchema);
        TableConfig createRealtimeTableConfigWithProcessAllMode = createRealtimeTableConfigWithProcessAllMode((File) unpackAvroData.get(0), MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, PROCESS_ALL_MODE_KAFKA_TOPIC);
        addTableConfig(createRealtimeTableConfigWithProcessAllMode);
        pushAvroIntoKafka(unpackAvroData);
        ClusterIntegrationTestUtils.pushAvroIntoKafka(unpackAvroData.subList(9, 12), "localhost:" + getKafkaPort(), PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
        ClusterIntegrationTestUtils.pushAvroIntoKafka(unpackAvroData.subList(0, 3), "localhost:" + getKafkaPort(), PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData.subList(3, 9), createRealtimeTableConfigWithProcessAllMode, createSchema, 0, this._segmentDir5, this._tarDir5);
        waitForAllDocsLoaded(600000L);
        setUpH2Connection(unpackAvroData);
        setUpQueryGenerator(unpackAvroData);
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
        this._pinotHelixResourceManager = this._controllerStarter.getHelixResourceManager();
    }

    protected String getTableName() {
        return SINGLE_LEVEL_CONCAT_TEST_REALTIME_TABLE;
    }

    protected TableTaskConfig getTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("100days.mergeType", "concat");
        hashMap.put("100days.bufferTimePeriod", "1d");
        hashMap.put("100days.bucketTimePeriod", "100d");
        hashMap.put("100days.maxNumRecordsPerSegment", "15000");
        hashMap.put("100days.maxNumRecordsPerTask", "15000");
        hashMap.put("ActualElapsedTime.aggregationType", "min");
        hashMap.put("WeatherDelay.aggregationType", "sum");
        return new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap));
    }

    private TableConfig createOfflineTableConfig(String str, TableTaskConfig tableTaskConfig) {
        return createOfflineTableConfig(str, tableTaskConfig, null);
    }

    private TableConfig createOfflineTableConfig(String str, TableTaskConfig tableTaskConfig, @Nullable SegmentPartitionConfig segmentPartitionConfig) {
        return new TableConfigBuilder(TableType.OFFLINE).setTableName(str).setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(tableTaskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(segmentPartitionConfig).build();
    }

    protected TableConfig createRealtimeTableConfigWithProcessAllMode(File file, String str, String str2) {
        ClusterTest.AvroFileSchemaKafkaAvroMessageDecoder._avroFile = file;
        Map streamConfigMap = getStreamConfigMap();
        streamConfigMap.put(StreamConfigProperties.constructStreamProperty("kafka", "topic.name"), str2);
        HashMap hashMap = new HashMap();
        hashMap.put("100days.mergeType", "concat");
        hashMap.put("100days.bufferTimePeriod", "1d");
        hashMap.put("100days.bucketTimePeriod", "100d");
        hashMap.put("100days.maxNumRecordsPerSegment", "15000");
        hashMap.put("100days.maxNumRecordsPerTask", "15000");
        hashMap.put("200days.mergeType", "concat");
        hashMap.put("200days.bufferTimePeriod", "1d");
        hashMap.put("200days.bucketTimePeriod", "200d");
        hashMap.put("200days.maxNumRecordsPerSegment", "15000");
        hashMap.put("200days.maxNumRecordsPerTask", "30000");
        hashMap.put("ActualElapsedTime.aggregationType", "min");
        hashMap.put("WeatherDelay.aggregationType", "sum");
        hashMap.put("mode", "processAll");
        return new TableConfigBuilder(TableType.REALTIME).setTableName(str).setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap))).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigMap).setNullHandlingEnabled(getNullHandlingEnabled()).build();
    }

    private TableTaskConfig getSingleLevelConcatTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("100days.mergeType", "concat");
        hashMap.put("100days.bufferTimePeriod", "1d");
        hashMap.put("100days.bucketTimePeriod", "100d");
        hashMap.put("100days.maxNumRecordsPerSegment", "15000");
        hashMap.put("100days.maxNumRecordsPerTask", "15000");
        hashMap.put("ActualElapsedTime.aggregationType", "min");
        hashMap.put("WeatherDelay.aggregationType", "sum");
        hashMap.put("overwriteOutput", "true");
        return new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap));
    }

    private TableTaskConfig getSingleLevelConcatMetadataTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("100days.mergeType", "concat");
        hashMap.put("100days.bufferTimePeriod", "1d");
        hashMap.put("100days.bucketTimePeriod", "100d");
        hashMap.put("100days.maxNumRecordsPerSegment", "15000");
        hashMap.put("100days.maxNumRecordsPerTask", "15000");
        hashMap.put("ActualElapsedTime.aggregationType", "min");
        hashMap.put("WeatherDelay.aggregationType", "sum");
        hashMap.put("overwriteOutput", "true");
        hashMap.put("push.mode", BatchConfigProperties.SegmentPushType.METADATA.toString());
        return new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap));
    }

    private TableTaskConfig getSingleLevelRollupTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("150days.mergeType", "rollup");
        hashMap.put("150days.bufferTimePeriod", "1d");
        hashMap.put("150days.bucketTimePeriod", "150d");
        hashMap.put("150days.roundBucketTimePeriod", "7d");
        hashMap.put("overwriteOutput", "true");
        return new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap));
    }

    private TableTaskConfig getMultiLevelConcatTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("45days.mergeType", "concat");
        hashMap.put("45days.bufferTimePeriod", "1d");
        hashMap.put("45days.bucketTimePeriod", "45d");
        hashMap.put("45days.maxNumRecordsPerSegment", "100000");
        hashMap.put("45days.maxNumRecordsPerTask", "100000");
        hashMap.put("90days.mergeType", "concat");
        hashMap.put("90days.bufferTimePeriod", "1d");
        hashMap.put("90days.bucketTimePeriod", "90d");
        hashMap.put("90days.maxNumRecordsPerSegment", "100000");
        hashMap.put("90days.maxNumRecordsPerTask", "100000");
        hashMap.put("overwriteOutput", "true");
        return new TableTaskConfig(Collections.singletonMap("MergeRollupTask", hashMap));
    }

    private SegmentPartitionConfig getMultiColumnsSegmentPartitionConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("AirlineID", new ColumnPartitionConfig("murmur", 1));
        hashMap.put("Month", new ColumnPartitionConfig("murmur", 1));
        return new SegmentPartitionConfig(hashMap);
    }

    private static void buildSegmentsFromAvroWithPostfix(List<File> list, TableConfig tableConfig, Schema schema, int i, File file, File file2, String str) throws Exception {
        int size = list.size();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(size);
        ArrayList arrayList = new ArrayList(size);
        for (int i2 = 0; i2 < size; i2++) {
            File file3 = list.get(i2);
            int i3 = i2 + i;
            arrayList.add(newFixedThreadPool.submit(() -> {
                SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
                segmentGeneratorConfig.setInputFilePath(file3.getPath());
                segmentGeneratorConfig.setOutDir(file.getPath());
                segmentGeneratorConfig.setTableName(tableConfig.getTableName());
                segmentGeneratorConfig.setSegmentNamePostfix(i3 + "_" + str);
                SegmentIndexCreationDriverImpl segmentIndexCreationDriverImpl = new SegmentIndexCreationDriverImpl();
                segmentIndexCreationDriverImpl.init(segmentGeneratorConfig);
                segmentIndexCreationDriverImpl.build();
                String segmentName = segmentIndexCreationDriverImpl.getSegmentName();
                TarCompressionUtils.createCompressedTarFile(new File(file, segmentName), new File(file2, segmentName + ".tar.gz"));
                return null;
            }));
        }
        newFixedThreadPool.shutdown();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
    }

    @Test
    public void testOfflineTableSingleLevelConcat() throws Exception {
        String str = "SELECT count(*) FROM myTable1";
        JsonNode postQuery = postQuery("SELECT count(*) FROM myTable1");
        int[] iArr = {1, 2, 2, 2, 1};
        int[] iArr2 = {13, 12, 13, 13, 12};
        long j = 1382400000000L;
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE);
        int i = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertEquals(this._helixTaskResourceManager.getSubtaskConfigs(str2).size(), iArr[i]);
            Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(this._taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            MergeRollupTaskMetadata fromZNRecord = MergeRollupTaskMetadata.fromZNRecord(this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertNotNull(fromZNRecord);
            Assert.assertEquals(((Long) fromZNRecord.getWatermarkMap().get("100days")).longValue(), j);
            j += 8640000000L;
            for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
                if (segmentZKMetadata.getSegmentName().startsWith("merged")) {
                    Assert.assertNotNull(segmentZKMetadata.getCustomMap());
                    Assert.assertEquals("100days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                    Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 8640000000L, segmentZKMetadata.getStartTimeMs() / 8640000000L);
                }
            }
            int i2 = i;
            TestUtils.waitForCondition(r9 -> {
                try {
                    JsonNode postQuery2 = postQuery(str);
                    if (SqlResultComparator.areEqual(postQuery2, postQuery, str)) {
                        return Boolean.valueOf(postQuery2.get("numSegmentsQueried").asInt() == iArr2[i2]);
                    }
                    return false;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, TIMEOUT_IN_MS, "Timeout while validating segments");
            List scheduledTaskNames = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i++;
        }
        Assert.assertEquals(i, 5);
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"));
        dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testOfflineTableSingleLevelConcatWithMetadataPush() throws Exception {
        String str = "SELECT count(*) FROM myTable4";
        JsonNode postQuery = postQuery("SELECT count(*) FROM myTable4");
        int[] iArr = {1, 2, 2, 2, 1};
        int[] iArr2 = {13, 12, 13, 13, 12};
        long j = 1382400000000L;
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
        int i = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertEquals(this._helixTaskResourceManager.getSubtaskConfigs(str2).size(), iArr[i]);
            Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(this._taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            MergeRollupTaskMetadata fromZNRecord = MergeRollupTaskMetadata.fromZNRecord(this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertNotNull(fromZNRecord);
            Assert.assertEquals(((Long) fromZNRecord.getWatermarkMap().get("100days")).longValue(), j);
            j += 8640000000L;
            for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
                if (segmentZKMetadata.getSegmentName().startsWith("merged")) {
                    Assert.assertNotNull(segmentZKMetadata.getCustomMap());
                    Assert.assertEquals("100days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                    Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 8640000000L, segmentZKMetadata.getStartTimeMs() / 8640000000L);
                }
            }
            int i2 = i;
            TestUtils.waitForCondition(r9 -> {
                try {
                    JsonNode postQuery2 = postQuery(str);
                    if (SqlResultComparator.areEqual(postQuery2, postQuery, str)) {
                        return Boolean.valueOf(postQuery2.get("numSegmentsQueried").asInt() == iArr2[i2]);
                    }
                    return false;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, TIMEOUT_IN_MS, "Timeout while validating segments");
            List scheduledTaskNames = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i++;
        }
        Assert.assertEquals(i, 5);
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));
        dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testOfflineTableSingleLevelRollup() throws Exception {
        String str = "SELECT count(*) FROM myTable2";
        JsonNode postQuery = postQuery("SELECT count(*) FROM myTable2");
        int[] iArr = {16, 7, 3};
        long j = 1386720000000L;
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE);
        int i = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertEquals(this._helixTaskResourceManager.getSubtaskConfigs(str2).size(), 1);
            Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(this._taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            MergeRollupTaskMetadata fromZNRecord = MergeRollupTaskMetadata.fromZNRecord(this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertNotNull(fromZNRecord);
            Assert.assertEquals(((Long) fromZNRecord.getWatermarkMap().get("150days")).longValue(), j);
            j += 12960000000L;
            for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
                if (segmentZKMetadata.getSegmentName().startsWith("merged")) {
                    Assert.assertNotNull(segmentZKMetadata.getCustomMap());
                    Assert.assertEquals("150days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                    Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 12960000000L, segmentZKMetadata.getStartTimeMs() / 12960000000L);
                }
            }
            int i2 = i;
            TestUtils.waitForCondition(r9 -> {
                try {
                    JsonNode postQuery2 = postQuery(str);
                    if (postQuery2.get("resultTable").get("rows").get(0).get(0).asInt() >= postQuery.get("resultTable").get("rows").get(0).get(0).asInt()) {
                        return false;
                    }
                    return Boolean.valueOf(postQuery2.get("numSegmentsQueried").asInt() == iArr[i2]);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, TIMEOUT_IN_MS, "Timeout while validating segments");
            List scheduledTaskNames = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i++;
        }
        Assert.assertEquals(postQuery("SELECT count(*) FROM myTable2").get("resultTable").get("rows").get(0).get(0).asInt(), postQuery.get("resultTable").get("rows").get(0).get(0).asInt() / 2);
        JsonNode postQuery2 = postQuery("SELECT count(*), DaysSinceEpoch FROM myTable2 GROUP BY DaysSinceEpoch ORDER BY DaysSinceEpoch");
        for (int i3 = 0; i3 < postQuery2.get("resultTable").get("rows").size(); i3++) {
            Assert.assertTrue(postQuery2.get("resultTable").get("rows").get(i3).get(1).asInt() % 7 == 0);
        }
        Assert.assertEquals(i, 3);
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"));
    }

    @Test
    public void testOfflineTableMultiLevelConcat() throws Exception {
        String str = "SELECT count(*) FROM myTable3";
        JsonNode postQuery = postQuery("SELECT count(*) FROM myTable3");
        int[] iArr = {1, 2, 1, 2, 1, 2, 1, 2, 1};
        int[] iArr2 = {12, 12, 11, 10, 9, 8, 7, 6, 5};
        Long[] lArr = new Long[8];
        lArr[0] = 16065L;
        lArr[1] = 16110L;
        lArr[2] = 16155L;
        lArr[3] = 16200L;
        lArr[4] = 16245L;
        lArr[5] = 16290L;
        lArr[6] = 16335L;
        lArr[7] = 16380L;
        Long[] lArr2 = new Long[8];
        lArr2[0] = null;
        lArr2[1] = 16020L;
        lArr2[2] = 16020L;
        lArr2[3] = 16110L;
        lArr2[4] = 16110L;
        lArr2[5] = 16200L;
        lArr2[6] = 16200L;
        lArr2[7] = 16290L;
        for (int i = 0; i < lArr.length; i++) {
            int i2 = i;
            lArr[i2] = Long.valueOf(lArr[i2].longValue() * 86400000);
        }
        for (int i3 = 1; i3 < lArr2.length; i3++) {
            int i4 = i3;
            lArr2[i4] = Long.valueOf(lArr2[i4].longValue() * 86400000);
        }
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE);
        int i5 = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertEquals(this._helixTaskResourceManager.getSubtaskConfigs(str2).size(), iArr[i5]);
            Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(this._taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            MergeRollupTaskMetadata fromZNRecord = MergeRollupTaskMetadata.fromZNRecord(this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertNotNull(fromZNRecord);
            Assert.assertEquals((Long) fromZNRecord.getWatermarkMap().get("45days"), lArr[i5]);
            Assert.assertEquals((Long) fromZNRecord.getWatermarkMap().get("90days"), lArr2[i5]);
            for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
                if (segmentZKMetadata.getSegmentName().startsWith("merged")) {
                    Assert.assertNotNull(segmentZKMetadata.getCustomMap());
                    if (segmentZKMetadata.getSegmentName().startsWith("merged_45days")) {
                        Assert.assertEquals("45days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                        Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 3888000000L, segmentZKMetadata.getStartTimeMs() / 3888000000L);
                    }
                    if (segmentZKMetadata.getSegmentName().startsWith("merged_90days")) {
                        Assert.assertEquals("90days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                        Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 7776000000L, segmentZKMetadata.getStartTimeMs() / 7776000000L);
                    }
                }
            }
            int i6 = i5;
            TestUtils.waitForCondition(r9 -> {
                try {
                    JsonNode postQuery2 = postQuery(str);
                    if (SqlResultComparator.areEqual(postQuery2, postQuery, str)) {
                        return Boolean.valueOf(postQuery2.get("numSegmentsQueried").asInt() == iArr2[i6]);
                    }
                    return false;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, TIMEOUT_IN_MS, "Timeout while validating segments");
            List scheduledTaskNames = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i5++;
        }
        Assert.assertEquals(i5, 8);
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days"));
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"));
    }

    protected void verifyTableDelete(String str) {
        TestUtils.waitForCondition(r6 -> {
            if (SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, str) == null && MinionTaskMetadataUtils.fetchTaskMetadata(this._propertyStore, "MergeRollupTask", str) == null) {
                return true;
            }
            return false;
        }, 1000L, 60000L, "Failed to delete table");
    }

    protected void waitForTaskToComplete() {
        TestUtils.waitForCondition(r4 -> {
            Iterator it = this._helixTaskResourceManager.getTaskStates("MergeRollupTask").values().iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
    }

    @Test
    public void testRealtimeTableSingleLevelConcat() throws Exception {
        PinotHelixTaskResourceManager helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        PinotTaskManager taskManager = this._controllerStarter.getTaskManager();
        PinotHelixResourceManager helixResourceManager = this._controllerStarter.getHelixResourceManager();
        String tableName = getTableName();
        String str = "SELECT count(*) FROM " + tableName;
        JsonNode postQuery = postQuery(str);
        long j = 1382400000000L;
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        int i = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertTrue(helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            MergeRollupTaskMetadata fromZNRecord = MergeRollupTaskMetadata.fromZNRecord(taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertNotNull(fromZNRecord);
            Assert.assertEquals(((Long) fromZNRecord.getWatermarkMap().get("100days")).longValue(), j);
            j += 8640000000L;
            for (SegmentZKMetadata segmentZKMetadata : helixResourceManager.getSegmentsZKMetadata(tableNameWithType)) {
                if (segmentZKMetadata.getSegmentName().startsWith("merged")) {
                    Assert.assertNotNull(segmentZKMetadata.getCustomMap());
                    Assert.assertEquals("100days", (String) segmentZKMetadata.getCustomMap().get("MergeRollupTask.mergeLevel"));
                    Assert.assertEquals(segmentZKMetadata.getEndTimeMs() / 8640000000L, segmentZKMetadata.getStartTimeMs() / 8640000000L);
                }
            }
            TestUtils.waitForCondition(r7 -> {
                try {
                    return SqlResultComparator.areEqual(postQuery(str), postQuery, str);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, TIMEOUT_IN_MS, "Timeout while validating segments");
            List scheduledTaskNames = ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i++;
        }
        Assert.assertEquals(i, 5);
        Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
        dropRealtimeTable(tableName);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testRealtimeTableProcessAllModeMultiLevelConcat() throws Exception {
        PinotHelixTaskResourceManager helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        PinotTaskManager taskManager = this._controllerStarter.getTaskManager();
        String str = "SELECT count(*) FROM " + MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE;
        JsonNode postQuery = postQuery(str);
        long[] jArr = {2, 1, 0, 0, 3, 2, 1, 0};
        long[] jArr2 = {0, 0, 2, 1, 0, 0, 1, 1};
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
        int i = 0;
        TaskSchedulingContext tablesToSchedule = new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType));
        String str2 = (String) ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str2 != null) {
            Assert.assertTrue(helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("MergeRollupTask")));
            Assert.assertNull(taskManager.scheduleTasks(tablesToSchedule).get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete();
            Assert.assertNull(taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("MergeRollupTask", tableNameWithType));
            Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.100days"));
            Assert.assertTrue(MetricValueUtils.gaugeExists(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.200days"));
            Assert.assertEquals(MetricValueUtils.getGaugeValue(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.100days"), jArr[i]);
            Assert.assertEquals(MetricValueUtils.getGaugeValue(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.200days"), jArr2[i]);
            List scheduledTaskNames = ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str2 = (scheduledTaskNames == null || scheduledTaskNames.isEmpty()) ? null : (String) scheduledTaskNames.get(0);
            i++;
        }
        Assert.assertEquals(i, 4);
        Assert.assertTrue(SqlResultComparator.areEqual(postQuery(str), postQuery, str));
        uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, TableType.REALTIME, this._tarDir5);
        waitForAllDocsLoaded(600000L);
        String str3 = (String) ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames().get(0);
        while (str3 != null) {
            waitForTaskToComplete();
            Assert.assertEquals(MetricValueUtils.getGaugeValue(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.100days"), jArr[i]);
            Assert.assertEquals(MetricValueUtils.getGaugeValue(this._controllerStarter.getControllerMetrics(), "mergeRollupTaskNumBucketsToProcess.myTable6_REALTIME.200days"), jArr2[i]);
            List scheduledTaskNames2 = ((TaskSchedulingInfo) taskManager.scheduleTasks(tablesToSchedule).get("MergeRollupTask")).getScheduledTaskNames();
            str3 = (scheduledTaskNames2 == null || scheduledTaskNames2.isEmpty()) ? null : (String) scheduledTaskNames2.get(0);
            i++;
        }
        Assert.assertEquals(i, 8);
        dropRealtimeTable(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
