package org.apache.pinot.integration.tests;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertTableIntegrationTest.class */
public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
    private static final String CSV_SCHEMA_HEADER = "playerId,name,game,score,timestampInEpoch,deleted";
    private static final String PARTIAL_UPSERT_TABLE_SCHEMA = "partial_upsert_table_test.schema";
    private static final String CSV_DELIMITER = ",";
    private static final String TABLE_NAME = "gameScores";
    private static final int NUM_SERVERS = 2;
    private static final String PRIMARY_KEY_COL = "playerId";
    protected static final String DELETE_COL = "deleted";

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(2);
        startKafka();
        pushCsvIntoKafka((File) unpackTarData(INPUT_DATA_TAR_FILE, this._tempDir).get(0), getKafkaTopic(), 0);
        addSchema(createSchema());
        addTableConfig(createCSVUpsertTableConfig(getTableName(), getSchemaName(), getKafkaTopic(), getNumKafkaPartitions(), getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER), null, PRIMARY_KEY_COL));
        waitForAllDocsLoaded(600000L);
        addSchema(createSchema(PARTIAL_UPSERT_TABLE_SCHEMA));
    }

    @AfterClass
    public void tearDown() throws IOException {
        dropRealtimeTable(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    protected String getSchemaFileName() {
        return "upsert_table_test.schema";
    }

    protected String getSchemaName() {
        return "playerScores";
    }

    @Nullable
    protected String getTimeColumnName() {
        return "timestampInEpoch";
    }

    protected String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    protected String getTableName() {
        return TABLE_NAME;
    }

    protected long getCountStarResult() {
        return 3L;
    }

    private Schema createSchema(String str) throws IOException {
        InputStream resourceAsStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(str);
        Assert.assertNotNull(resourceAsStream);
        return Schema.fromInputStream(resourceAsStream);
    }

    private long queryCountStarWithoutUpsert(String str) {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + str + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long getCountStarResultWithoutUpsert() {
        return 10L;
    }

    protected void waitForAllDocsLoaded(long j) throws Exception {
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(getTableName()) == getCountStarResultWithoutUpsert());
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
    }

    @Test
    protected void testDeleteWithFullUpsert() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        testDeleteWithFullUpsert(getKafkaTopic() + "-with-deletes", "gameScoresWithDelete", upsertConfig);
    }

    protected void testDeleteWithFullUpsert(String str, String str2, UpsertConfig upsertConfig) throws Exception {
        addTableConfig(createCSVUpsertTableConfig(str2, getSchemaName(), str, getNumKafkaPartitions(), getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER), upsertConfig, PRIMARY_KEY_COL));
        pushCsvIntoKafka((File) unpackTarData(INPUT_DATA_TAR_FILE, this._tempDir).get(0), str, 0);
        pushCsvIntoKafka(ImmutableList.of("102,Clifford,counter-strike,102,1681054200000,true", "100,Zook,counter-strike,2050,1681377200000,true"), str, 0);
        TestUtils.waitForCondition(r7 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(str2) == 12);
            } catch (Exception e) {
                return null;
            }
        }, 100L, 600000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
        ResultSet resultSet = getPinotConnection().execute("SELECT * FROM " + str2).getResultSet(0);
        Assert.assertEquals(resultSet.getRowCount(), 1);
        int columnCount = resultSet.getColumnCount();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= columnCount) {
                break;
            }
            if (PRIMARY_KEY_COL.equalsIgnoreCase(resultSet.getColumnName(i2))) {
                i = i2;
                break;
            }
            i2++;
        }
        Assert.assertNotEquals(Integer.valueOf(i), -1);
        Assert.assertEquals(resultSet.getString(0, i), "101");
        ResultSet resultSet2 = getPinotConnection().execute("SELECT playerId FROM " + str2 + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
        Assert.assertEquals(resultSet2.getRowCount(), 2);
        for (int i3 = 0; i3 < resultSet2.getRowCount(); i3++) {
            String string = resultSet2.getString(i3, 0);
            Assert.assertTrue("100".equalsIgnoreCase(string) || "102".equalsIgnoreCase(string));
        }
        pushCsvIntoKafka(Collections.singletonList("100,Zook-New,,0.0,1684707335000,false"), str, 0);
        TestUtils.waitForCondition(r72 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(str2) == 13);
            } catch (Exception e) {
                return null;
            }
        }, 100L, 600000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
        ResultSet resultSet3 = getPinotConnection().execute("SELECT playerId, name, game FROM " + str2 + " WHERE playerId = 100").getResultSet(0);
        Assert.assertEquals(resultSet3.getRowCount(), 1);
        Assert.assertEquals(resultSet3.getInt(0, 0), 100);
        Assert.assertEquals(resultSet3.getString(0, 1), "Zook-New");
        Assert.assertEquals(resultSet3.getString(0, 2), "null");
        Assert.assertTrue(getPinotConnection().execute("SELECT playerId, name FROM " + str2 + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0).getRowCount() > 1);
        dropRealtimeTable(str2);
    }

    @Test
    public void testDeleteWithPartialUpsert() throws Exception {
        UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
        upsertConfig.setDeleteRecordColumn(DELETE_COL);
        testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes", "gameScoresPartialUpsertWithDelete", upsertConfig);
    }

    protected void testDeleteWithPartialUpsert(String str, String str2, UpsertConfig upsertConfig) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("game", UpsertConfig.Strategy.UNION);
        hashMap.put("score", UpsertConfig.Strategy.INCREMENT);
        hashMap.put(DELETE_COL, UpsertConfig.Strategy.OVERWRITE);
        upsertConfig.setPartialUpsertStrategies(hashMap);
        addTableConfig(createCSVUpsertTableConfig(str2, "playerScoresPartialUpsert", str, getNumKafkaPartitions(), getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER), upsertConfig, PRIMARY_KEY_COL));
        pushCsvIntoKafka((File) unpackTarData("gameScores_partial_upsert_csv.tar.gz", this._tempDir).get(0), str, 0);
        pushCsvIntoKafka(ImmutableList.of("102,Clifford,counter-strike,102,1681054200000,true", "100,Zook,counter-strike,2050,1681377200000,true"), str, 0);
        TestUtils.waitForCondition(r7 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(str2) == 12);
            } catch (Exception e) {
                return null;
            }
        }, 100L, 600000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
        ResultSet resultSet = getPinotConnection().execute("SELECT * FROM " + str2).getResultSet(0);
        Assert.assertEquals(resultSet.getRowCount(), 1);
        int columnCount = resultSet.getColumnCount();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= columnCount) {
                break;
            }
            if (PRIMARY_KEY_COL.equalsIgnoreCase(resultSet.getColumnName(i2))) {
                i = i2;
                break;
            }
            i2++;
        }
        Assert.assertNotEquals(Integer.valueOf(i), -1);
        Assert.assertEquals(resultSet.getString(0, i), "101");
        ResultSet resultSet2 = getPinotConnection().execute("SELECT playerId FROM " + str2 + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
        Assert.assertEquals(resultSet2.getRowCount(), 2);
        for (int i3 = 0; i3 < resultSet2.getRowCount(); i3++) {
            String string = resultSet2.getString(i3, 0);
            Assert.assertTrue("100".equalsIgnoreCase(string) || "102".equalsIgnoreCase(string));
        }
        pushCsvIntoKafka(Collections.singletonList("100,Zook,,0.0,1684707335000,false"), str, 0);
        TestUtils.waitForCondition(r72 -> {
            try {
                return Boolean.valueOf(queryCountStarWithoutUpsert(str2) == 13);
            } catch (Exception e) {
                return null;
            }
        }, 100L, 600000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
        ResultSet resultSet3 = getPinotConnection().execute("SELECT playerId, name, game FROM " + str2 + " WHERE playerId = 100").getResultSet(0);
        Assert.assertEquals(resultSet3.getRowCount(), 1);
        Assert.assertEquals(resultSet3.getInt(0, 0), 100);
        Assert.assertEquals(resultSet3.getString(0, 1), "Zook");
        Assert.assertEquals(resultSet3.getString(0, 2), "[\"null\"]");
        Assert.assertTrue(getPinotConnection().execute("SELECT playerId, name FROM " + str2 + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0).getRowCount() > 1);
        dropRealtimeTable(str2);
    }
}
