package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.integration.tests.ClusterTest;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TimestampConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.class */
public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseClusterIntegrationTestSet {
    private PinotHelixTaskResourceManager _taskResourceManager;
    private PinotTaskManager _taskManager;
    private String _realtimeTableName;
    private String _offlineTableName;
    private String _realtimeMetadataTableName;
    private String _offlineMetadataTableName;
    private long _dataSmallestTimeMs;
    private long _dataSmallestMetadataTableTimeMs;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public SegmentPartitionConfig getSegmentPartitionConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("AirlineID", new ColumnPartitionConfig("murmur", 3));
        hashMap.put("OriginAirportID", new ColumnPartitionConfig("hashcode", 2));
        return new SegmentPartitionConfig(hashMap);
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(this._tempDir);
        startZk();
        startController();
        startBroker();
        startServer();
        startMinion();
        startKafka();
        List<File> unpackAvroData = unpackAvroData(this._tempDir);
        Schema createSchema = createSchema();
        createSchema.addField(new DateTimeFieldSpec("ts", FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS"));
        addSchema(createSchema);
        TableConfig createRealtimeTableConfig = createRealtimeTableConfig(unpackAvroData.get(0));
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setTransformConfigs(Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)")));
        createRealtimeTableConfig.setIngestionConfig(ingestionConfig);
        FieldConfig fieldConfig = new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null, new TimestampConfig(Arrays.asList(TimestampIndexGranularity.HOUR, TimestampIndexGranularity.DAY, TimestampIndexGranularity.WEEK, TimestampIndexGranularity.MONTH)), null);
        createRealtimeTableConfig.setFieldConfigList(Collections.singletonList(fieldConfig));
        HashMap hashMap = new HashMap();
        hashMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
        createRealtimeTableConfig.setTaskConfig(new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, hashMap)));
        addTableConfig(createRealtimeTableConfig);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        createOfflineTableConfig.setFieldConfigList(Collections.singletonList(fieldConfig));
        addTableConfig(createOfflineTableConfig);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
        hashMap2.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
        TableConfig createRealtimeTableConfig2 = createRealtimeTableConfig(unpackAvroData.get(0), "myTable2", new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, hashMap2)));
        createRealtimeTableConfig2.setIngestionConfig(ingestionConfig);
        createRealtimeTableConfig2.setFieldConfigList(Collections.singletonList(fieldConfig));
        addTableConfig(createRealtimeTableConfig2);
        TableConfig createOfflineTableConfig2 = createOfflineTableConfig("myTable2", null, getSegmentPartitionConfig());
        createOfflineTableConfig2.setFieldConfigList(Collections.singletonList(fieldConfig));
        addTableConfig(createOfflineTableConfig2);
        pushAvroIntoKafka(unpackAvroData);
        setUpH2Connection(unpackAvroData);
        waitForAllDocsLoaded(600000L);
        waitForDocsLoaded(600000L, true, "myTable2");
        this._taskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
        this._realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        this._offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
        this._realtimeMetadataTableName = TableNameBuilder.REALTIME.tableNameWithType("myTable2");
        this._offlineMetadataTableName = TableNameBuilder.OFFLINE.tableNameWithType("myTable2");
        long j = Long.MAX_VALUE;
        for (SegmentZKMetadata segmentZKMetadata : this._helixResourceManager.getSegmentsZKMetadata(this._realtimeTableName)) {
            if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
                j = Math.min(j, segmentZKMetadata.getStartTimeMs());
            }
        }
        this._dataSmallestTimeMs = j;
        long j2 = Long.MAX_VALUE;
        for (SegmentZKMetadata segmentZKMetadata2 : this._helixResourceManager.getSegmentsZKMetadata(this._realtimeMetadataTableName)) {
            if (segmentZKMetadata2.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
                j2 = Math.min(j2, segmentZKMetadata2.getStartTimeMs());
            }
        }
        this._dataSmallestMetadataTableTimeMs = j2;
    }

    private TableConfig createOfflineTableConfig(String str, @Nullable TableTaskConfig tableTaskConfig, @Nullable SegmentPartitionConfig segmentPartitionConfig) {
        return new TableConfigBuilder(TableType.OFFLINE).setTableName(str).setSchemaName(getSchemaName()).setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(tableTaskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(segmentPartitionConfig).build();
    }

    protected TableConfig createRealtimeTableConfig(File file, String str, TableTaskConfig tableTaskConfig) {
        ClusterTest.AvroFileSchemaKafkaAvroMessageDecoder._avroFile = file;
        return new TableConfigBuilder(TableType.REALTIME).setTableName(str).setSchemaName(getSchemaName()).setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(tableTaskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryconfig()).setLLC(useLlc()).setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
    }

    @Test
    public void testRealtimeToOfflineSegmentsTask() throws Exception {
        Assert.assertTrue(this._helixResourceManager.getSegmentsZKMetadata(this._offlineTableName).isEmpty());
        SegmentPartitionConfig segmentPartitionConfig = getOfflineTableConfig().getIndexingConfig().getSegmentPartitionConfig();
        int intValue = segmentPartitionConfig != null ? ((Integer) segmentPartitionConfig.getColumnPartitionMap().values().stream().map((v0) -> {
            return v0.getNumPartitions();
        }).reduce((num, num2) -> {
            return Integer.valueOf(num.intValue() * num2.intValue());
        }).orElseThrow(() -> {
            return new RuntimeException("Expected accumulated result but not found.");
        })).intValue() : 1;
        long j = this._dataSmallestTimeMs + 86400000;
        for (int i = 0; i < 3; i++) {
            Assert.assertNotNull(this._taskManager.scheduleTasks(this._realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
            Assert.assertTrue(this._taskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
            Assert.assertNull(this._taskManager.scheduleTasks(this._realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
            waitForTaskToComplete(j, this._realtimeTableName);
            List<SegmentZKMetadata> segmentsZKMetadata = this._helixResourceManager.getSegmentsZKMetadata(this._offlineTableName);
            Assert.assertEquals(segmentsZKMetadata.size(), intValue * (i + 1));
            long j2 = j - 86400000;
            for (int i2 = intValue * i; i2 < segmentsZKMetadata.size(); i2++) {
                SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(i2);
                Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), j2);
                Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), j2);
                if (segmentPartitionConfig != null) {
                    Assert.assertEquals((Set<?>) segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(), (Set<?>) segmentPartitionConfig.getColumnPartitionMap().keySet());
                    Iterator<String> it2 = segmentPartitionConfig.getColumnPartitionMap().keySet().iterator();
                    while (it2.hasNext()) {
                        Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(it2.next()).size(), 1);
                    }
                }
            }
            j += 86400000;
        }
        testHardcodedQueries();
    }

    @Test
    public void testRealtimeToOfflineSegmentsMetadataPushTask() throws Exception {
        Assert.assertTrue(this._helixResourceManager.getSegmentsZKMetadata(this._offlineMetadataTableName).isEmpty());
        SegmentPartitionConfig segmentPartitionConfig = getOfflineTableConfig().getIndexingConfig().getSegmentPartitionConfig();
        int intValue = segmentPartitionConfig != null ? ((Integer) segmentPartitionConfig.getColumnPartitionMap().values().stream().map((v0) -> {
            return v0.getNumPartitions();
        }).reduce((num, num2) -> {
            return Integer.valueOf(num.intValue() * num2.intValue());
        }).orElseThrow(() -> {
            return new RuntimeException("Expected accumulated result but not found.");
        })).intValue() : 1;
        long j = this._dataSmallestMetadataTableTimeMs + 86400000;
        this._taskManager.cleanUpTask();
        for (int i = 0; i < 3; i++) {
            Assert.assertNotNull(this._taskManager.scheduleTasks(this._realtimeMetadataTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
            Assert.assertTrue(this._taskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
            Assert.assertNull(this._taskManager.scheduleTasks(this._realtimeMetadataTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
            waitForTaskToComplete(j, this._realtimeMetadataTableName);
            List<SegmentZKMetadata> segmentsZKMetadata = this._helixResourceManager.getSegmentsZKMetadata(this._offlineMetadataTableName);
            Assert.assertEquals(segmentsZKMetadata.size(), intValue * (i + 1));
            long j2 = j - 86400000;
            for (int i2 = intValue * i; i2 < segmentsZKMetadata.size(); i2++) {
                SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(i2);
                Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), j2);
                Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), j2);
                if (segmentPartitionConfig != null) {
                    Assert.assertEquals((Set<?>) segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(), (Set<?>) segmentPartitionConfig.getColumnPartitionMap().keySet());
                    Iterator<String> it2 = segmentPartitionConfig.getColumnPartitionMap().keySet().iterator();
                    while (it2.hasNext()) {
                        Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(it2.next()).size(), 1);
                    }
                }
            }
            j += 86400000;
        }
    }

    private void waitForTaskToComplete(long j, String str) {
        TestUtils.waitForCondition(r4 -> {
            Iterator<TaskState> it2 = this._taskResourceManager.getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values().iterator();
            while (it2.hasNext()) {
                if (it2.next() != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
        ZNRecord minionTaskMetadataZNRecord = this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, str);
        RealtimeToOfflineSegmentsTaskMetadata fromZNRecord = minionTaskMetadataZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(minionTaskMetadataZNRecord) : null;
        Assert.assertNotNull(fromZNRecord);
        Assert.assertEquals(fromZNRecord.getWatermarkMs(), j);
    }

    @AfterClass
    public void tearDown() throws Exception {
        dropRealtimeTable(this._realtimeTableName);
        Assert.assertNull(MinionTaskMetadataUtils.fetchTaskMetadata(this._propertyStore, MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, this._realtimeTableName));
        dropOfflineTable(this._offlineTableName);
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
