package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.class */
public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest {
    protected PinotHelixTaskResourceManager _helixTaskResourceManager;
    protected PinotTaskManager _taskManager;
    private static final String PRIMARY_KEY_COL = "clientId";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType("mytable");
    private static List<File> _avroFiles;
    private TableConfig _tableConfig;
    private Schema _schema;

    protected String getSchemaFileName() {
        return "upsert_upload_segment_test.schema";
    }

    protected String getSchemaName() {
        return "upsertSchema";
    }

    protected String getAvroTarFileName() {
        return "upsert_compaction_test.tar.gz";
    }

    protected String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    private TableTaskConfig getCompactionTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("bufferTimePeriod", "0d");
        hashMap.put("invalidRecordsThresholdPercent", "1");
        hashMap.put("invalidRecordsThresholdCount", "10");
        return new TableTaskConfig(Collections.singletonMap("UpsertCompactionTask", hashMap));
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(1);
        _avroFiles = unpackAvroData(this._tempDir);
        startKafka();
        this._schema = createSchema();
        addSchema(this._schema);
        this._tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
        this._tableConfig.setTaskConfig(getCompactionTaskConfig());
        addTableConfig(this._tableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, this._tableConfig, this._schema, 0, this._segmentDir, this._tarDir);
        startMinion();
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
    }

    @BeforeMethod
    public void beforeMethod() throws Exception {
        uploadSegments(getTableName(), TableType.REALTIME, this._tarDir);
    }

    protected void waitForAllDocsLoaded(long j, long j2) throws Exception {
        TestUtils.waitForCondition(r8 -> {
            try {
                return Boolean.valueOf(getCurrentCountStarResultWithoutUpsert() == j2);
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
    }

    private long getCurrentCountStarResultWithoutUpsert() {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long getSalary() {
        return getPinotConnection().execute("SELECT salary FROM " + getTableName() + " WHERE clientId=100001").getResultSet(0).getLong(0);
    }

    protected long getCountStarResult() {
        return 3L;
    }

    @AfterMethod
    public void afterMethod() throws Exception {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        List listSegments = listSegments(tableNameWithType);
        Assert.assertFalse(listSegments.isEmpty());
        Iterator it = listSegments.iterator();
        while (it.hasNext()) {
            dropSegment(tableNameWithType, (String) it.next());
        }
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(listSegments(tableNameWithType).isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to drop the segments");
        stopServer();
        startServers(1);
    }

    @AfterClass
    public void tearDown() throws IOException {
        dropRealtimeTable(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Test
    public void testCompaction() throws Exception {
        waitForAllDocsLoaded(600000L, 283L);
        Assert.assertEquals(getSalary(), 9747108L);
        Assert.assertNotNull(this._taskManager.scheduleTasks(REALTIME_TABLE_NAME).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded(600000L, 3L);
        Assert.assertEquals(getSalary(), 9747108L);
    }

    @Test
    public void testCompactionDeletesSegments() throws Exception {
        pushAvroIntoKafka(_avroFiles);
        waitForAllDocsLoaded(600000L, 566L);
        Assert.assertEquals(getSalary(), 9747108L);
        Assert.assertNull(this._taskManager.scheduleTasks(REALTIME_TABLE_NAME).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded(600000L, 283L);
        Assert.assertEquals(getSalary(), 9747108L);
    }

    protected void waitForTaskToComplete() {
        TestUtils.waitForCondition(r4 -> {
            Iterator it = this._helixTaskResourceManager.getTaskStates("UpsertCompactionTask").values().iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
    }
}
