package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.class */
public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
    private static final String PURGE_FIRST_RUN_TABLE = "myTable1";
    private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
    private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
    private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4";
    protected PinotHelixTaskResourceManager _helixTaskResourceManager;
    protected PinotTaskManager _taskManager;
    protected PinotHelixResourceManager _pinotHelixResourceManager;
    protected String _tableName;
    protected final File _segmentDataDir = new File(this._tempDir, "segmentDataDir");
    protected final File _segmentTarDir = new File(this._tempDir, "segmentTarDir");

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDataDir, this._segmentTarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        startMinion();
        List<String> of = List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE, PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
        Schema schema = null;
        TableConfig tableConfig = null;
        for (String str : of) {
            schema = createSchema();
            schema.setSchemaName(str);
            addSchema(schema);
            setTableName(str);
            tableConfig = createOfflineTableConfig();
            tableConfig.setTaskConfig(getPurgeTaskConfig());
            addTableConfig(tableConfig);
        }
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData(this._tempDir), tableConfig, schema, 0, this._segmentDataDir, this._segmentTarDir);
        Iterator it = of.iterator();
        while (it.hasNext()) {
            uploadSegments((String) it.next(), this._segmentTarDir);
        }
        setRecordPurger();
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
        this._pinotHelixResourceManager = this._controllerStarter.getHelixResourceManager();
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
        String tableNameWithType2 = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
        List<SegmentZKMetadata> segmentsZKMetadata = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);
        HashMap hashMap = new HashMap();
        hashMap.put("PurgeTask.time", String.valueOf(System.currentTimeMillis() - 88400000));
        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
            segmentZKMetadata.setCustomMap(hashMap);
            this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata);
        }
        List<SegmentZKMetadata> segmentsZKMetadata2 = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType2);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("PurgeTask.time", String.valueOf(System.currentTimeMillis() - 4000));
        for (SegmentZKMetadata segmentZKMetadata2 : segmentsZKMetadata2) {
            segmentZKMetadata2.setCustomMap(hashMap2);
            this._pinotHelixResourceManager.updateZkMetadata(tableNameWithType2, segmentZKMetadata2);
        }
    }

    private void setRecordPurger() {
        MinionContext.getInstance().setRecordPurgerFactory(str -> {
            if (Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE, PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE).contains(str)) {
                return genericRow -> {
                    return genericRow.getValue("ArrTime").equals(1);
                };
            }
            return null;
        });
    }

    public String getTableName() {
        return this._tableName;
    }

    public void setTableName(String str) {
        this._tableName = str;
    }

    private TableTaskConfig getPurgeTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("lastPurgeTimeThresholdPeriod", "1d");
        return new TableTaskConfig(Collections.singletonMap("PurgeTask", hashMap));
    }

    @Test
    public void testFirstRunPurge() throws Exception {
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
        Assert.assertNotNull(this._taskManager.scheduleTasks(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType))).get("PurgeTask"));
        Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("PurgeTask")));
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        waitForTaskToComplete();
        Iterator it = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType).iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((SegmentZKMetadata) it.next()).getCustomMap().containsKey("PurgeTask.time"));
        }
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 115493);
        }, 60000L, "Failed to get expected purged records");
        dropOfflineTable(PURGE_FIRST_RUN_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testPassedDelayTimePurge() throws Exception {
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
        Assert.assertNotNull(this._taskManager.scheduleTasks(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType))).get("PurgeTask"));
        Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("PurgeTask")));
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        waitForTaskToComplete();
        Iterator it = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType).iterator();
        while (it.hasNext()) {
            String str = (String) ((SegmentZKMetadata) it.next()).getCustomMap().get("PurgeTask.time");
            Assert.assertNotNull(str);
            Assert.assertTrue(System.currentTimeMillis() - Long.parseLong(str) < 86400000);
        }
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 115493);
        }, 60000L, "Failed to get expected purged records");
        dropOfflineTable(PURGE_DELTA_PASSED_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testNotPassedDelayTimePurge() throws Exception {
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        Iterator it = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType).iterator();
        while (it.hasNext()) {
            String str = (String) ((SegmentZKMetadata) it.next()).getCustomMap().get("PurgeTask.time");
            Assert.assertNotNull(str);
            long parseLong = Long.parseLong(str);
            Assert.assertTrue(System.currentTimeMillis() - parseLong > 4000);
            Assert.assertTrue(System.currentTimeMillis() - parseLong < 86400000);
        }
        Assert.assertEquals(getCurrentCountStarResult(PURGE_DELTA_NOT_PASSED_TABLE), 115545L);
        dropOfflineTable(PURGE_DELTA_NOT_PASSED_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    @Test
    public void testPurgeOnOldSegmentsWithIndicesOnNewColumns() throws Exception {
        Schema createSchema = createSchema();
        createSchema.addField(new DimensionFieldSpec("ColumnABC", FieldSpec.DataType.INT, true));
        createSchema.addField(new DimensionFieldSpec("ColumnXYZ", FieldSpec.DataType.INT, true));
        createSchema.setSchemaName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
        updateSchema(createSchema);
        setTableName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        createOfflineTableConfig.setTaskConfig(getPurgeTaskConfig());
        IndexingConfig indexingConfig = createOfflineTableConfig.getIndexingConfig();
        ArrayList arrayList = new ArrayList(indexingConfig.getInvertedIndexColumns());
        arrayList.add("ColumnABC");
        ArrayList arrayList2 = new ArrayList(indexingConfig.getRangeIndexColumns());
        arrayList2.add("ColumnXYZ");
        indexingConfig.setInvertedIndexColumns(arrayList);
        indexingConfig.setRangeIndexColumns(arrayList2);
        updateTableConfig(createOfflineTableConfig);
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
        Assert.assertNotNull(this._taskManager.scheduleTasks(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType))).get("PurgeTask"));
        Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName("PurgeTask")));
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext().setTablesToSchedule(Collections.singleton(tableNameWithType)).setTasksToSchedule(Collections.singleton("PurgeTask")), this._taskManager);
        waitForTaskToComplete();
        Iterator it = this._pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType).iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((SegmentZKMetadata) it.next()).getCustomMap().containsKey("PurgeTask.time"));
        }
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(getCurrentCountStarResult(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE) == 115493);
        }, 60000L, "Failed to get expected purged records");
        dropOfflineTable(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
        verifyTableDelete(tableNameWithType);
    }

    protected void verifyTableDelete(String str) {
        TestUtils.waitForCondition(r6 -> {
            if (SegmentLineageAccessHelper.getSegmentLineage(this._propertyStore, str) == null && MinionTaskMetadataUtils.fetchTaskMetadata(this._propertyStore, "PurgeTask", str) == null) {
                return true;
            }
            return false;
        }, 1000L, 60000L, "Failed to delete table");
    }

    protected void waitForTaskToComplete() {
        TestUtils.waitForCondition(r4 -> {
            Iterator it = this._helixTaskResourceManager.getTaskStates("PurgeTask").values().iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
