package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.class */
public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final int NUM_SERVERS = 2;
    private static final String PRIMARY_KEY_COL = "clientId";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType("mytable");
    private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
    private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
    private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(this._tempDir, this._segmentDir, this._tarDir);
        startZk();
        startController();
        startBroker();
        startServers(2);
        List<File> unpackAvroData = unpackAvroData(this._tempDir);
        startKafka();
        pushAvroIntoKafka(unpackAvroData);
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createUpsertTableConfig = createUpsertTableConfig(unpackAvroData.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
        addTableConfig(createUpsertTableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createUpsertTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), TableType.REALTIME, this._tarDir);
        waitForAllDocsLoaded(600000L);
    }

    @AfterClass
    public void tearDown() throws IOException {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        List<String> listSegments = listSegments(tableNameWithType);
        Assert.assertFalse(listSegments.isEmpty());
        Iterator<String> it2 = listSegments.iterator();
        while (it2.hasNext()) {
            dropSegment(tableNameWithType, it2.next());
        }
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(listSegments(tableNameWithType).isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to drop the segments");
        dropRealtimeTable(tableNameWithType);
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected String getSchemaFileName() {
        return "upsert_table_test.schema";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getSchemaName() {
        return "upsertSchema";
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    protected String getAvroTarFileName() {
        return "upsert_test.tar.gz";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public long getCountStarResult() {
        return 3L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public void waitForAllDocsLoaded(long j) throws Exception {
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert());
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
    }

    private long getCurrentCountStarResultWithoutUpsert() {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long getCountStarResultWithoutUpsert() {
        return 600L;
    }

    @Test
    public void testSegmentAssignment() throws Exception {
        verifyIdealState();
        this._controllerStarter.getRealtimeSegmentValidationManager().run();
        verifyIdealState();
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
        Assert.assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
        restartServers();
        verifyIdealState();
        waitForAllDocsLoaded(600000L);
    }

    private void verifyIdealState() {
        Map<String, Map<String, String>> mapFields = HelixHelper.getTableIdealState(this._helixManager, REALTIME_TABLE_NAME).getRecord().getMapFields();
        Assert.assertEquals(mapFields.size(), 5);
        String str = null;
        String str2 = null;
        for (Map.Entry<String, Map<String, String>> entry : mapFields.entrySet()) {
            String key = entry.getKey();
            Map<String, String> value = entry.getValue();
            Assert.assertEquals(value.size(), 1);
            Map.Entry<String, String> next = value.entrySet().iterator().next();
            String value2 = next.getValue();
            if (LLCSegmentName.isLowLevelConsumerSegmentName(key)) {
                Assert.assertEquals(value2, "CONSUMING");
            } else {
                Assert.assertEquals(value2, "ONLINE");
            }
            String key2 = next.getKey();
            int segmentPartitionId = getSegmentPartitionId(key);
            if (segmentPartitionId != 0) {
                Assert.assertEquals(segmentPartitionId, 1);
                if (str2 == null) {
                    str2 = key2;
                } else {
                    Assert.assertEquals(key2, str2);
                }
            } else if (str == null) {
                str = key2;
            } else {
                Assert.assertEquals(key2, str);
            }
        }
    }

    private static int getSegmentPartitionId(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case 529039136:
                if (str.equals(UPLOADED_SEGMENT_2)) {
                    z = true;
                    break;
                }
                break;
            case 2063250050:
                if (str.equals(UPLOADED_SEGMENT_1)) {
                    z = false;
                    break;
                }
                break;
            case 2137000167:
                if (str.equals(UPLOADED_SEGMENT_3)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return 0;
            case true:
            case true:
                return 1;
            default:
                return new LLCSegmentName(str).getPartitionGroupId();
        }
    }
}
