package org.apache.pinot.integration.tests;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.class */
public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest {
    private PinotHelixTaskResourceManager _helixTaskResourceManager;
    private PinotTaskManager _taskManager;
    private PinotHelixResourceManager _pinotHelixResourceManager;
    private long _dataSmallestTimeMs;
    private String _realtimeTableName;
    private String _offlineTableName;

    protected TableTaskConfig getTaskConfig() {
        return new TableTaskConfig(Collections.singletonMap("RealtimeToOfflineSegmentsTask", new HashMap()));
    }

    protected boolean useLlc() {
        return true;
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        super.setUp();
        addTableConfig(createOfflineTableConfig());
        startMinion();
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
        this._pinotHelixResourceManager = this._controllerStarter.getHelixResourceManager();
        this._realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        this._offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
        long j = Long.MAX_VALUE;
        for (SegmentZKMetadata segmentZKMetadata : this._pinotHelixResourceManager.getSegmentsZKMetadata(this._realtimeTableName)) {
            if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
                j = Math.min(j, segmentZKMetadata.getStartTimeMs());
            }
        }
        this._dataSmallestTimeMs = j;
    }

    @Test
    public void testRealtimeToOfflineSegmentsTask() throws IOException {
        Assert.assertTrue(this._pinotHelixResourceManager.getSegmentsZKMetadata(this._offlineTableName).isEmpty());
        SegmentPartitionConfig segmentPartitionConfig = getOfflineTableConfig().getIndexingConfig().getSegmentPartitionConfig();
        int intValue = segmentPartitionConfig != null ? ((Integer) segmentPartitionConfig.getColumnPartitionMap().values().stream().map((v0) -> {
            return v0.getNumPartitions();
        }).reduce((num, num2) -> {
            return Integer.valueOf(num.intValue() * num2.intValue());
        }).orElseThrow(() -> {
            return new RuntimeException("Expected accumulated result but not found.");
        })).intValue() : 1;
        long j = this._dataSmallestTimeMs + 86400000;
        for (int i = 0; i < 3; i++) {
            Assert.assertNotNull(this._taskManager.scheduleTasks().get("RealtimeToOfflineSegmentsTask"));
            Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("RealtimeToOfflineSegmentsTask")));
            Assert.assertNull(this._taskManager.scheduleTasks().get("RealtimeToOfflineSegmentsTask"));
            waitForTaskToComplete(j);
            List segmentsZKMetadata = this._pinotHelixResourceManager.getSegmentsZKMetadata(this._offlineTableName);
            Assert.assertEquals(segmentsZKMetadata.size(), intValue * (i + 1));
            long j2 = j - 86400000;
            for (int i2 = intValue * i; i2 < segmentsZKMetadata.size(); i2++) {
                SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) segmentsZKMetadata.get(i2);
                Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), j2);
                Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), j2);
                if (segmentPartitionConfig != null) {
                    Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(), segmentPartitionConfig.getColumnPartitionMap().keySet());
                    Iterator it = segmentPartitionConfig.getColumnPartitionMap().keySet().iterator();
                    while (it.hasNext()) {
                        Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions((String) it.next()).size(), 1);
                    }
                }
            }
            j += 86400000;
        }
        testHardcodedQueries();
        dropRealtimeTable(this._realtimeTableName);
        verifyTableDelete(this._realtimeTableName);
    }

    @Nullable
    protected SegmentPartitionConfig getSegmentPartitionConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("AirlineID", new ColumnPartitionConfig("murmur", 3));
        hashMap.put("OriginAirportID", new ColumnPartitionConfig("hashcode", 2));
        return new SegmentPartitionConfig(hashMap);
    }

    protected void verifyTableDelete(String str) {
        TestUtils.waitForCondition(r6 -> {
            return MinionTaskMetadataUtils.fetchTaskMetadata(this._propertyStore, "RealtimeToOfflineSegmentsTask", str) == null;
        }, 1000L, 60000L, "Failed to delete table");
    }

    private void waitForTaskToComplete(long j) {
        TestUtils.waitForCondition(r4 -> {
            Iterator it = this._helixTaskResourceManager.getTaskStates("RealtimeToOfflineSegmentsTask").values().iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
        ZNRecord minionTaskMetadataZNRecord = this._taskManager.getClusterInfoAccessor().getMinionTaskMetadataZNRecord("RealtimeToOfflineSegmentsTask", this._realtimeTableName);
        RealtimeToOfflineSegmentsTaskMetadata fromZNRecord = minionTaskMetadataZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(minionTaskMetadataZNRecord) : null;
        Assert.assertNotNull(fromZNRecord);
        Assert.assertEquals(fromZNRecord.getWatermarkMs(), j);
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testDictionaryBasedQueries() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testHardcodedQueries() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testQueriesFromQueryFile() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testGeneratedQueriesWithMultiValues() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testQueryExceptions() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testInstanceShutdown() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
