package org.apache.pinot.integration.tests;

import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.class */
public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegrationTestSet {
    private static final int NUM_SERVERS = 1;
    private static final String PRIMARY_KEY_COL = "clientId";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType("mytable");
    private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
    private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
    private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(1);
        startKafka();
        populateTables();
    }

    protected void populateTables() throws Exception {
        List unpackAvroData = unpackAvroData(this._tempDir);
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createUpsertTableConfig = createUpsertTableConfig((File) unpackAvroData.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
        Assert.assertNotNull(createUpsertTableConfig.getUpsertConfig());
        addTableConfig(createUpsertTableConfig);
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(unpackAvroData, createUpsertTableConfig, createSchema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), TableType.REALTIME, this._tarDir);
        pushAvroIntoKafka(unpackAvroData);
        waitForAllDocsLoaded(600000L);
    }

    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.server.instance.max.segment.preload.threads", "1");
        pinotConfiguration.setProperty(Joiner.on(".").join("pinot.server.instance", "upsert", new Object[]{"default.enable.snapshot"}), "true");
        pinotConfiguration.setProperty(Joiner.on(".").join("pinot.server.instance", "upsert", new Object[]{"default.enable.preload"}), "true");
    }

    @AfterClass
    public void tearDown() throws IOException {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        List listSegments = listSegments(tableNameWithType);
        Assert.assertFalse(listSegments.isEmpty());
        Iterator it = listSegments.iterator();
        while (it.hasNext()) {
            dropSegment(tableNameWithType, (String) it.next());
        }
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(listSegments(tableNameWithType).isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to drop the segments");
        dropRealtimeTable(tableNameWithType);
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    protected String getSchemaFileName() {
        return "upsert_upload_segment_test.schema";
    }

    protected String getAvroTarFileName() {
        return "upsert_upload_segment_test.tar.gz";
    }

    protected String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    protected long getCountStarResult() {
        return 3L;
    }

    protected void waitForAllDocsLoaded(long j) throws Exception {
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert());
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
    }

    private long getCurrentCountStarResultWithoutUpsert() {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long getCountStarResultWithoutUpsert() {
        return 600L;
    }

    @Test
    public void testSegmentAssignment() throws Exception {
        verifyIdealState(5);
        this._controllerStarter.getRealtimeSegmentValidationManager().run();
        verifyIdealState(5);
        Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
        Assert.assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert());
        waitForSnapshotCreation();
        restartServers();
        verifyIdealState(7);
        waitForAllDocsLoaded(600000L);
    }

    protected void waitForSnapshotCreation() throws Exception {
        String tableName = getTableName();
        sendPostRequest(this._controllerRequestURLBuilder.forTableForceCommit(tableName), null);
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
        TestUtils.waitForCondition(r10 -> {
            Iterator it = this._serverStarters.iterator();
            while (it.hasNext()) {
                for (File file : new File(((BaseServerStarter) it.next()).getConfig().getProperty("pinot.server.instance.dataDir"), tableNameWithType).listFiles((file2, str) -> {
                    return str.startsWith(tableName) && !LLCSegmentName.isLLCSegment(str);
                })) {
                    if (!new File(new File(file, "v3"), "validdocids.bitmap.snapshot").exists()) {
                        return false;
                    }
                }
            }
            return true;
        }, 600000L, "Failed to verify snapshots");
    }

    protected void verifyIdealState(int i) {
        Map mapFields = HelixHelper.getTableIdealState(this._helixManager, REALTIME_TABLE_NAME).getRecord().getMapFields();
        Assert.assertEquals(mapFields.size(), i);
        String str = null;
        String str2 = null;
        int i2 = 0;
        Iterator it = mapFields.entrySet().iterator();
        while (it.hasNext()) {
            LLCSegmentName of = LLCSegmentName.of((String) ((Map.Entry) it.next()).getKey());
            if (of != null) {
                i2 = Math.max(i2, of.getSequenceNumber());
            }
        }
        for (Map.Entry entry : mapFields.entrySet()) {
            String str3 = (String) entry.getKey();
            Map map = (Map) entry.getValue();
            Assert.assertEquals(map.size(), 1);
            Map.Entry entry2 = (Map.Entry) map.entrySet().iterator().next();
            String str4 = (String) entry2.getValue();
            LLCSegmentName of2 = LLCSegmentName.of(str3);
            if (of2 == null) {
                Assert.assertEquals(str4, "ONLINE");
            } else if (of2.getSequenceNumber() < i2) {
                Assert.assertEquals(str4, "ONLINE");
            } else {
                Assert.assertEquals(str4, "CONSUMING");
            }
            String str5 = (String) entry2.getKey();
            int segmentPartitionId = getSegmentPartitionId(str3);
            if (segmentPartitionId != 0) {
                Assert.assertEquals(segmentPartitionId, 1);
                if (str2 == null) {
                    str2 = str5;
                } else {
                    Assert.assertEquals(str5, str2);
                }
            } else if (str == null) {
                str = str5;
            } else {
                Assert.assertEquals(str5, str);
            }
        }
    }

    private static int getSegmentPartitionId(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case 529039136:
                if (str.equals(UPLOADED_SEGMENT_2)) {
                    z = true;
                    break;
                }
                break;
            case 2063250050:
                if (str.equals(UPLOADED_SEGMENT_1)) {
                    z = false;
                    break;
                }
                break;
            case 2137000167:
                if (str.equals(UPLOADED_SEGMENT_3)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return 0;
            case true:
            case SimpleMinionClusterIntegrationTest.NUM_TASKS /* 2 */:
                return 1;
            default:
                return new LLCSegmentName(str).getPartitionGroupId();
        }
    }
}
