package org.apache.pinot.integration.tests;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.TaskState;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertTableIntegrationTest.class */
public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final String INPUT_DATA_SMALL_TAR_FILE = "gameScores_csv.tar.gz";
    private static final String INPUT_DATA_LARGE_TAR_FILE = "gameScores_large_csv.tar.gz";
    private static final String CSV_SCHEMA_HEADER = "playerId,name,game,score,timestampInEpoch,deleted";
    private static final String PARTIAL_UPSERT_TABLE_SCHEMA = "partial_upsert_table_test.schema";
    private static final String CSV_DELIMITER = ",";
    private static final String TABLE_NAME = "gameScores";
    private static final int NUM_SERVERS = 2;
    private static final String DELETE_COL = "deleted";
    public static final String PRIMARY_KEY_COL = "playerId";
    public static final String TIME_COL_NAME = "timestampInEpoch";
    public static final String UPSERT_SCHEMA_FILE_NAME = "upsert_table_test.schema";
    protected PinotTaskManager _taskManager;
    protected PinotHelixTaskResourceManager _helixTaskResourceManager;

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(2);
        startMinion();
        startKafka();
        setupTable(getTableName(), getKafkaTopic(), INPUT_DATA_SMALL_TAR_FILE, null);
        waitForAllDocsLoaded(60000L);
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
        addSchema(createSchema(PARTIAL_UPSERT_TABLE_SCHEMA));
        this._taskManager = this._controllerStarter.getTaskManager();
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
    }

    @AfterClass
    public void tearDown() throws IOException {
        dropRealtimeTable(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    protected String getSchemaFileName() {
        return UPSERT_SCHEMA_FILE_NAME;
    }

    @Nullable
    protected String getTimeColumnName() {
        return TIME_COL_NAME;
    }

    protected String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    protected String getTableName() {
        return TABLE_NAME;
    }

    protected long getCountStarResult() {
        return 3L;
    }

    protected int getRealtimeSegmentFlushSize() {
        return 500;
    }

    private long getCountStarResultWithoutUpsert() {
        return 10L;
    }

    private Schema createSchema(String str) throws IOException {
        InputStream resourceAsStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(str);
        Assert.assertNotNull(resourceAsStream);
        return Schema.fromInputStream(resourceAsStream);
    }

    private long queryCountStarWithoutUpsert(String str) {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + str + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long queryCountStar(String str) {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + str).getResultSet(0).getLong(0);
    }

    protected void waitForAllDocsLoaded(long j) {
        waitForAllDocsLoaded(getTableName(), j, getCountStarResultWithoutUpsert());
    }

    private void waitForAllDocsLoaded(String str, long j, long j2) {
        TestUtils.waitForCondition(r9 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(str) == j2);
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
    }

    private void waitForNumQueriedSegmentsToConverge(String str, long j, long j2) {
        TestUtils.waitForCondition(r9 -> {
            try {
                return Boolean.valueOf(getNumSegmentsQueried(str) == j2);
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
    }

    @Test
    protected void testDeleteWithFullUpsert() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        testDeleteWithFullUpsert(getKafkaTopic() + "-with-deletes", "gameScoresWithDelete", upsertConfig);
    }

    protected void testDeleteWithFullUpsert(String str, String str2, UpsertConfig upsertConfig) throws Exception {
        setupTable(str2, str, INPUT_DATA_SMALL_TAR_FILE, upsertConfig);
        pushCsvIntoKafka(ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true", "100,Zook,counter-strike,2050,1681377200000,true"), str, 0);
        waitForAllDocsLoaded(str2, 600000L, 12L);
        ResultSet resultSet = getPinotConnection().execute("SELECT * FROM " + str2).getResultSet(0);
        Assert.assertEquals(resultSet.getRowCount(), 1);
        int columnCount = resultSet.getColumnCount();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= columnCount) {
                break;
            }
            if (PRIMARY_KEY_COL.equalsIgnoreCase(resultSet.getColumnName(i2))) {
                i = i2;
                break;
            }
            i2++;
        }
        Assert.assertNotEquals(Integer.valueOf(i), -1);
        Assert.assertEquals(resultSet.getString(0, i), "101");
        ResultSet resultSet2 = getPinotConnection().execute("SELECT playerId FROM " + str2 + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
        Assert.assertEquals(resultSet2.getRowCount(), 2);
        for (int i3 = 0; i3 < resultSet2.getRowCount(); i3++) {
            String string = resultSet2.getString(i3, 0);
            Assert.assertTrue("100".equalsIgnoreCase(string) || "102".equalsIgnoreCase(string));
        }
        pushCsvIntoKafka(Collections.singletonList("100,Zook-New,,0.0,1684707335000,false"), str, 0);
        waitForAllDocsLoaded(str2, 600000L, 13L);
        ResultSet resultSet3 = getPinotConnection().execute("SELECT playerId, name, game FROM " + str2 + " WHERE playerId = 100").getResultSet(0);
        Assert.assertEquals(resultSet3.getRowCount(), 1);
        Assert.assertEquals(resultSet3.getInt(0, 0), 100);
        Assert.assertEquals(resultSet3.getString(0, 1), "Zook-New");
        Assert.assertEquals(resultSet3.getString(0, 2), "null");
        Assert.assertTrue(getPinotConnection().execute("SELECT playerId, name FROM " + str2 + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0).getRowCount() > 1);
        dropRealtimeTable(str2);
    }

    private TableConfig setupTable(String str, String str2, String str3, UpsertConfig upsertConfig) throws Exception {
        Map cSVDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
        Schema createSchema = createSchema();
        createSchema.setSchemaName(str);
        addSchema(createSchema);
        TableConfig createCSVUpsertTableConfig = createCSVUpsertTableConfig(str, str2, getNumKafkaPartitions(), cSVDecoderProperties, upsertConfig, PRIMARY_KEY_COL);
        addTableConfig(createCSVUpsertTableConfig);
        pushCsvIntoKafka((File) unpackTarData(str3, this._tempDir).get(0), str2, 0);
        return createCSVUpsertTableConfig;
    }

    @Test
    public void testDeleteWithPartialUpsert() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes", "gameScoresPartialUpsertWithDelete", upsertConfig);
    }

    protected void testDeleteWithPartialUpsert(String str, String str2, UpsertConfig upsertConfig) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("game", UpsertConfig.Strategy.UNION);
        hashMap.put("score", UpsertConfig.Strategy.INCREMENT);
        hashMap.put(DELETE_COL, UpsertConfig.Strategy.OVERWRITE);
        upsertConfig.setPartialUpsertStrategies(hashMap);
        Map cSVDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
        Schema createSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
        createSchema.setSchemaName(str2);
        addSchema(createSchema);
        addTableConfig(createCSVUpsertTableConfig(str2, str, getNumKafkaPartitions(), cSVDecoderProperties, upsertConfig, PRIMARY_KEY_COL));
        pushCsvIntoKafka((File) unpackTarData("gameScores_partial_upsert_csv.tar.gz", this._tempDir).get(0), str, 0);
        pushCsvIntoKafka(ImmutableList.of("102,Clifford,counter-strike,102,1681054200000,true", "100,Zook,counter-strike,2050,1681377200000,true"), str, 0);
        waitForAllDocsLoaded(str2, 600000L, 12L);
        ResultSet resultSet = getPinotConnection().execute("SELECT * FROM " + str2).getResultSet(0);
        Assert.assertEquals(resultSet.getRowCount(), 1);
        int columnCount = resultSet.getColumnCount();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= columnCount) {
                break;
            }
            if (PRIMARY_KEY_COL.equalsIgnoreCase(resultSet.getColumnName(i2))) {
                i = i2;
                break;
            }
            i2++;
        }
        Assert.assertNotEquals(Integer.valueOf(i), -1);
        Assert.assertEquals(resultSet.getString(0, i), "101");
        ResultSet resultSet2 = getPinotConnection().execute("SELECT playerId FROM " + str2 + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
        Assert.assertEquals(resultSet2.getRowCount(), 2);
        for (int i3 = 0; i3 < resultSet2.getRowCount(); i3++) {
            String string = resultSet2.getString(i3, 0);
            Assert.assertTrue("100".equalsIgnoreCase(string) || "102".equalsIgnoreCase(string));
        }
        pushCsvIntoKafka(Collections.singletonList("100,Zook,,0.0,1684707335000,false"), str, 0);
        waitForAllDocsLoaded(str2, 600000L, 13L);
        ResultSet resultSet3 = getPinotConnection().execute("SELECT playerId, name, game FROM " + str2 + " WHERE playerId = 100").getResultSet(0);
        Assert.assertEquals(resultSet3.getRowCount(), 1);
        Assert.assertEquals(resultSet3.getInt(0, 0), 100);
        Assert.assertEquals(resultSet3.getString(0, 1), "Zook");
        Assert.assertEquals(resultSet3.getString(0, 2), "[\"null\"]");
        Assert.assertTrue(getPinotConnection().execute("SELECT playerId, name FROM " + str2 + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0).getRowCount() > 1);
        dropRealtimeTable(str2);
    }

    @Test
    public void testDefaultMetadataManagerClass() throws Exception {
        PinotConfiguration serverConf = getServerConf(12345);
        serverConf.setProperty(Joiner.on(".").join("pinot.server.instance", "upsert", new Object[]{"default.metadata.manager.class"}), DummyTableUpsertMetadataManager.class.getName());
        BaseServerStarter baseServerStarter = null;
        try {
            BaseServerStarter startOneServer = startOneServer(serverConf);
            InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(startOneServer.getServerInstance().getHelixManager(), startOneServer.getInstanceId());
            instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), Arrays.asList(StringUtils.split("DummyTag_REALTIME,DummyTag_OFFLINE", ',')));
            if (!this._helixDataAccessor.setProperty(this._helixDataAccessor.keyBuilder().instanceConfig(startOneServer.getInstanceId()), instanceConfig)) {
                throw new RuntimeException("Failed to set instance config for instance: " + startOneServer.getInstanceId());
            }
            TableConfig createCSVUpsertTableConfig = createCSVUpsertTableConfig("dummyTable123", getKafkaTopic(), getNumKafkaPartitions(), getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER), null, PRIMARY_KEY_COL);
            createCSVUpsertTableConfig.setTenantConfig(new TenantConfig("DefaultTenant", "DummyTag", (TagOverrideConfig) null));
            Schema createSchema = createSchema();
            createSchema.setSchemaName("dummyTable123");
            addSchema(createSchema);
            addTableConfig(createCSVUpsertTableConfig);
            Thread.sleep(1000L);
            Assert.assertTrue(startOneServer.getServerInstance().getInstanceDataManager().getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType("dummyTable123")).getTableUpsertMetadataManager() instanceof DummyTableUpsertMetadataManager);
            dropRealtimeTable("dummyTable123");
            deleteSchema("dummyTable123");
            waitForEVToDisappear("dummyTable123");
            if (startOneServer != null) {
                startOneServer.stop();
            }
            SegmentBuildTimeLeaseExtender.initExecutor();
        } catch (Throwable th) {
            if (0 != 0) {
                baseServerStarter.stop();
            }
            SegmentBuildTimeLeaseExtender.initExecutor();
            throw th;
        }
    }

    @Test
    public void testUpsertCompaction() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        upsertConfig.setEnableSnapshot(true);
        TableConfig tableConfig = setupTable("gameScoresWithCompaction", getKafkaTopic() + "-with-compaction", INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
        tableConfig.setTaskConfig(getCompactionTaskConfig());
        updateTableConfig(tableConfig);
        waitForAllDocsLoaded("gameScoresWithCompaction", 600000L, 1000L);
        Assert.assertEquals(getScore("gameScoresWithCompaction"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompaction", 10000L, 3L);
        Assert.assertNotNull(this._taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType("gameScoresWithCompaction")).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded("gameScoresWithCompaction", 600000L, 3L);
        Assert.assertEquals(getScore("gameScoresWithCompaction"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompaction", 10000L, 3L);
        dropRealtimeTable("gameScoresWithCompaction");
    }

    @Test
    public void testUpsertCompactionDeletesSegments() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        upsertConfig.setEnableSnapshot(true);
        String str = getKafkaTopic() + "-with-compaction-segment-delete";
        TableConfig tableConfig = setupTable("gameScoresWithCompactionDeleteSegments", str, INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
        tableConfig.setTaskConfig(getCompactionTaskConfig());
        updateTableConfig(tableConfig);
        pushCsvIntoKafka((File) unpackTarData(INPUT_DATA_LARGE_TAR_FILE, this._tempDir).get(0), str, 0);
        waitForAllDocsLoaded("gameScoresWithCompactionDeleteSegments", 600000L, 2000L);
        Assert.assertEquals(getScore("gameScoresWithCompactionDeleteSegments"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionDeleteSegments", 10000L, 5L);
        Assert.assertNotNull(this._taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType("gameScoresWithCompactionDeleteSegments")).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded("gameScoresWithCompactionDeleteSegments", 600000L, 3L);
        Assert.assertEquals(getScore("gameScoresWithCompactionDeleteSegments"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionDeleteSegments", 10000L, 3L);
        dropRealtimeTable("gameScoresWithCompactionDeleteSegments");
    }

    @Test
    public void testUpsertCompactionWithSoftDelete() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        String str = getKafkaTopic() + "-with-compaction-delete";
        TableConfig tableConfig = setupTable("gameScoresWithCompactionWithSoftDelete", str, INPUT_DATA_LARGE_TAR_FILE, upsertConfig);
        Map configsForTaskType = getCompactionTaskConfig().getConfigsForTaskType("UpsertCompactionTask");
        configsForTaskType.put("validDocIdsType", ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString());
        tableConfig.setTaskConfig(new TableTaskConfig(Collections.singletonMap("UpsertCompactionTask", configsForTaskType)));
        updateTableConfig(tableConfig);
        List unpackTarData = unpackTarData(INPUT_DATA_LARGE_TAR_FILE, this._tempDir);
        pushCsvIntoKafka((File) unpackTarData.get(0), str, 0);
        waitForAllDocsLoaded("gameScoresWithCompactionWithSoftDelete", 600000L, 2000L);
        Assert.assertEquals(getScore("gameScoresWithCompactionWithSoftDelete"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionWithSoftDelete", 10000L, 5L);
        Assert.assertEquals(queryCountStar("gameScoresWithCompactionWithSoftDelete"), 3L);
        pushCsvIntoKafka(ImmutableList.of("102,Clifford,counter-strike,102,1681254200000,true", "100,Zook,counter-strike,2050,1681377200000,true"), str, 0);
        waitForAllDocsLoaded("gameScoresWithCompactionWithSoftDelete", 600000L, 2002L);
        Assert.assertEquals(queryCountStar("gameScoresWithCompactionWithSoftDelete"), 1L);
        Assert.assertNotNull(this._taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType("gameScoresWithCompactionWithSoftDelete")).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded("gameScoresWithCompactionWithSoftDelete", 600000L, 3L);
        Assert.assertEquals(getScore("gameScoresWithCompactionWithSoftDelete"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionWithSoftDelete", 10000L, 2L);
        Assert.assertEquals(queryCountStar("gameScoresWithCompactionWithSoftDelete"), 1L);
        pushCsvIntoKafka((File) unpackTarData.get(0), str, 0);
        waitForAllDocsLoaded("gameScoresWithCompactionWithSoftDelete", 600000L, 1003L);
        Assert.assertEquals(getScore("gameScoresWithCompactionWithSoftDelete"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionWithSoftDelete", 10000L, 4L);
        Assert.assertEquals(queryCountStar("gameScoresWithCompactionWithSoftDelete"), 1L);
        Assert.assertEquals(getNumDeletedRows("gameScoresWithCompactionWithSoftDelete"), 2L);
        Assert.assertNotNull(this._taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType("gameScoresWithCompactionWithSoftDelete")).get("UpsertCompactionTask"));
        waitForTaskToComplete();
        waitForAllDocsLoaded("gameScoresWithCompactionWithSoftDelete", 600000L, 3L);
        Assert.assertEquals(getScore("gameScoresWithCompactionWithSoftDelete"), 3692L);
        waitForNumQueriedSegmentsToConverge("gameScoresWithCompactionWithSoftDelete", 10000L, 2L);
        Assert.assertEquals(queryCountStar("gameScoresWithCompactionWithSoftDelete"), 1L);
        Assert.assertEquals(getNumDeletedRows("gameScoresWithCompactionWithSoftDelete"), 0L);
        dropRealtimeTable("gameScoresWithCompactionWithSoftDelete");
    }

    private long getScore(String str) {
        return getPinotConnection().execute("SELECT score FROM " + str + " WHERE playerId = 101").getResultSet(0).getFloat(0);
    }

    private long getNumSegmentsQueried(String str) {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + str).getExecutionStats().getNumSegmentsQueried();
    }

    private long getNumDeletedRows(String str) {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + str + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private void waitForTaskToComplete() {
        TestUtils.waitForCondition(r4 -> {
            Iterator it = this._helixTaskResourceManager.getTaskStates("UpsertCompactionTask").values().iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            return true;
        }, 600000L, "Failed to complete task");
    }

    private TableTaskConfig getCompactionTaskConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("bufferTimePeriod", "0d");
        hashMap.put("invalidRecordsThresholdCount", "1");
        return new TableTaskConfig(Collections.singletonMap("UpsertCompactionTask", hashMap));
    }
}
