package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.core.type.TypeReference;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.class */
public class PartialUpsertTableRebalanceIntegrationTest extends BaseClusterIntegrationTest {
    private static final int NUM_SERVERS = 1;
    private static final String PRIMARY_KEY_COL = "clientId";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType("mytable");
    private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
    private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
    private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
    private PinotHelixResourceManager _resourceManager;
    private TableRebalancer _tableRebalancer;
    private static List<File> _avroFiles;
    private TableConfig _tableConfig;
    private Schema _schema;

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServers(1);
        startKafka();
        this._resourceManager = getControllerStarter().getHelixResourceManager();
        this._tableRebalancer = new TableRebalancer(this._resourceManager.getHelixZkManager());
        createSchemaAndTable();
    }

    @Test
    public void testRebalance() throws Exception {
        populateTables();
        verifyIdealState(5, 1);
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setDryRun(false);
        rebalanceConfig.setMinAvailableReplicas(0);
        rebalanceConfig.setIncludeConsuming(true);
        BaseServerStarter startOneServer = startOneServer(1234);
        TableConfig tableConfig = this._resourceManager.getTableConfig(REALTIME_TABLE_NAME);
        RebalanceResult rebalance = this._tableRebalancer.rebalance(tableConfig, rebalanceConfig, (String) null);
        int size = this._resourceManager.getServerInstancesForTable(getTableName(), TableType.REALTIME).size();
        Assert.assertEquals(size, 2, "Rebalancing didn't correctly add the new server");
        waitForRebalanceToComplete(rebalance, 600000L);
        waitForAllDocsLoaded(600000L);
        verifySegmentAssignment(rebalance.getSegmentAssignment(), 5, size);
        BaseServerStarter startOneServer2 = startOneServer(4567);
        RebalanceResult rebalance2 = this._tableRebalancer.rebalance(tableConfig, rebalanceConfig, (String) null);
        Assert.assertEquals(this._resourceManager.getServerInstancesForTable(getTableName(), TableType.REALTIME).size(), 3, "Rebalancing didn't correctly add the new server");
        waitForRebalanceToComplete(rebalance2, 600000L);
        waitForAllDocsLoaded(600000L);
        verifySegmentAssignment(rebalance2.getSegmentAssignment(), 5, getNumKafkaPartitions());
        this._resourceManager.updateInstanceTags(startOneServer.getInstanceId(), "", false);
        this._resourceManager.updateInstanceTags(startOneServer2.getInstanceId(), "", false);
        rebalanceConfig.setReassignInstances(true);
        rebalanceConfig.setDowntime(true);
        RebalanceResult rebalance3 = this._tableRebalancer.rebalance(tableConfig, rebalanceConfig, (String) null);
        verifySegmentAssignment(rebalance3.getSegmentAssignment(), 5, 1);
        waitForRebalanceToComplete(rebalance3, 600000L);
        waitForAllDocsLoaded(600000L);
        startOneServer.stop();
        startOneServer2.stop();
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(this._resourceManager.dropInstance(startOneServer.getInstanceId()).isSuccessful() && this._resourceManager.dropInstance(startOneServer2.getInstanceId()).isSuccessful());
        }, 60000L, "Failed to drop servers");
    }

    @Test
    public void testReload() throws Exception {
        pushAvroIntoKafka(_avroFiles);
        waitForAllDocsLoaded(600000L, 300L);
        String str = (String) ((Map) JsonUtils.stringToObject(reloadRealtimeTable(getTableName()), new TypeReference<Map<String, String>>() { // from class: org.apache.pinot.integration.tests.PartialUpsertTableRebalanceIntegrationTest.1
        })).get("status");
        waitForReloadToComplete((String) ((Map) ((Map) JsonUtils.stringToObject(str.substring(str.indexOf("{")), new TypeReference<Map<String, Map<String, String>>>() { // from class: org.apache.pinot.integration.tests.PartialUpsertTableRebalanceIntegrationTest.2
        })).get(REALTIME_TABLE_NAME)).get("reloadJobId"), 600000L);
        waitForAllDocsLoaded(600000L, 300L);
        verifyIdealState(4, 1);
    }

    @AfterMethod
    public void afterMethod() throws Exception {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        getControllerRequestClient().pauseConsumption(tableNameWithType);
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(getControllerRequestClient().getPauseStatus(tableNameWithType).getConsumingSegments().isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to drop the segments");
        Iterator it = listSegments(tableNameWithType).iterator();
        while (it.hasNext()) {
            dropSegment(tableNameWithType, (String) it.next());
        }
        TestUtils.waitForCondition(r62 -> {
            try {
                return Boolean.valueOf(listSegments(tableNameWithType).isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to drop the segments");
        stopKafka();
        restartServers();
        startKafka();
        getControllerRequestClient().resumeConsumption(tableNameWithType);
    }

    protected void verifySegmentAssignment(Map<String, Map<String, String>> map, int i, int i2) {
        Assert.assertEquals(map.size(), i);
        int i3 = 0;
        Iterator<Map.Entry<String, Map<String, String>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            if (LLCSegmentName.isLowLevelConsumerSegmentName(key)) {
                i3 = Math.max(i3, new LLCSegmentName(key).getSequenceNumber());
            }
        }
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
            String key2 = entry.getKey();
            Map<String, String> value = entry.getValue();
            Assert.assertEquals(value.size(), 1);
            Map.Entry<String, String> next = value.entrySet().iterator().next();
            String value2 = next.getValue();
            if (!LLCSegmentName.isLowLevelConsumerSegmentName(key2)) {
                Assert.assertEquals(value2, "ONLINE");
            } else if (new LLCSegmentName(key2).getSequenceNumber() < i3) {
                Assert.assertEquals(value2, "ONLINE");
            } else {
                Assert.assertEquals(value2, "CONSUMING");
            }
            String key3 = next.getKey();
            int segmentPartitionId = getSegmentPartitionId(key2);
            hashSet.add(key3);
            Assert.assertEquals(key3, (String) hashMap.computeIfAbsent(Integer.valueOf(segmentPartitionId), num -> {
                return key3;
            }));
        }
        Assert.assertEquals(hashSet.size(), i2);
    }

    protected void verifyIdealState(int i, int i2) {
        verifySegmentAssignment(HelixHelper.getTableIdealState(this._helixManager, REALTIME_TABLE_NAME).getRecord().getMapFields(), i, i2);
    }

    protected void populateTables() throws Exception {
        ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, this._tableConfig, this._schema, 0, this._segmentDir, this._tarDir);
        uploadSegments(getTableName(), TableType.REALTIME, this._tarDir);
        pushAvroIntoKafka(_avroFiles);
        waitForAllDocsLoaded(600000L);
    }

    protected void createSchemaAndTable() throws Exception {
        _avroFiles = unpackAvroData(this._tempDir);
        this._schema = createSchema();
        addSchema(this._schema);
        this._tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
        this._tableConfig.getValidationConfig().setDeletedSegmentsRetentionPeriod((String) null);
        this._tableConfig.getUpsertConfig().setMode(UpsertConfig.Mode.PARTIAL);
        this._tableConfig.getUpsertConfig().setPartialUpsertStrategies(new HashMap());
        this._tableConfig.getUpsertConfig().setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
        this._tableConfig.getIndexingConfig().setNullHandlingEnabled(true);
        addTableConfig(this._tableConfig);
    }

    @AfterClass
    public void tearDown() {
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
    }

    protected String getSchemaFileName() {
        return "upsert_upload_segment_test.schema";
    }

    protected String getAvroTarFileName() {
        return "upsert_upload_segment_test.tar.gz";
    }

    protected String getPartitionColumn() {
        return PRIMARY_KEY_COL;
    }

    protected long getCountStarResult() {
        return 3L;
    }

    protected void waitForAllDocsLoaded(long j, long j2) throws Exception {
        TestUtils.waitForCondition(r8 -> {
            try {
                return Boolean.valueOf(getCurrentCountStarResultWithoutUpsert() == j2);
            } catch (Exception e) {
                return null;
            }
        }, 1000L, j, "Failed to load all documents");
    }

    private static int getSegmentPartitionId(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case 529039136:
                if (str.equals(UPLOADED_SEGMENT_2)) {
                    z = true;
                    break;
                }
                break;
            case 2063250050:
                if (str.equals(UPLOADED_SEGMENT_1)) {
                    z = false;
                    break;
                }
                break;
            case 2137000167:
                if (str.equals(UPLOADED_SEGMENT_3)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return 0;
            case true:
            case SimpleMinionClusterIntegrationTest.NUM_TASKS /* 2 */:
                return 1;
            default:
                return new LLCSegmentName(str).getPartitionGroupId();
        }
    }

    protected void waitForRebalanceToComplete(RebalanceResult rebalanceResult, long j) throws Exception {
        String jobId = rebalanceResult.getJobId();
        if (rebalanceResult.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
            return;
        }
        TestUtils.waitForCondition(r7 -> {
            try {
                try {
                    return Boolean.valueOf(((ServerRebalanceJobStatusResponse) JsonUtils.stringToObject(HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new URL(getControllerRequestURLBuilder().forTableRebalanceStatus(jobId)).toURI(), (Map) null)).getResponse(), ServerRebalanceJobStatusResponse.class)).getTableRebalanceProgressStats().getStatus() != RebalanceResult.Status.IN_PROGRESS);
                } catch (HttpErrorStatusException | URISyntaxException e) {
                    throw new IOException((Throwable) e);
                }
            } catch (Exception e2) {
                return null;
            }
        }, 1000L, j, "Failed to load all segments after rebalance");
    }

    protected void waitForReloadToComplete(String str, long j) throws Exception {
        TestUtils.waitForCondition(r7 -> {
            try {
                try {
                    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse = (ServerReloadControllerJobStatusResponse) JsonUtils.stringToObject(HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(getControllerRequestURLBuilder().forSegmentReloadStatus(str)).toURI(), (Map) null)).getResponse(), ServerReloadControllerJobStatusResponse.class);
                    return Boolean.valueOf(serverReloadControllerJobStatusResponse.getSuccessCount() == serverReloadControllerJobStatusResponse.getTotalSegmentCount());
                } catch (HttpErrorStatusException | URISyntaxException e) {
                    throw new IOException((Throwable) e);
                }
            } catch (Exception e2) {
                return null;
            }
        }, 1000L, j, "Failed to load all segments after reload");
    }

    protected void waitForAllDocsLoaded(long j) throws Exception {
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(((getCurrentCountStarResultWithoutUpsert() > getCountStarResultWithoutUpsert() ? 1 : (getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert() ? 0 : -1)) == 0) && ((getCurrentCountStarResult() > getCountStarResult() ? 1 : (getCurrentCountStarResult() == getCountStarResult() ? 0 : -1)) == 0) && ((getCurrentCountStarResultWithoutNulls(getTableName(), this._schema) > getCountStarResultWithoutUpsert() ? 1 : (getCurrentCountStarResultWithoutNulls(getTableName(), this._schema) == getCountStarResultWithoutUpsert() ? 0 : -1)) == 0));
            } catch (Exception e) {
                return null;
            }
        }, 100L, j, "Failed to load all documents");
    }

    private long getCurrentCountStarResultWithoutUpsert() {
        return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)").getResultSet(0).getLong(0);
    }

    private long getCountStarResultWithoutUpsert() {
        return 600L;
    }

    protected long getCurrentCountStarResultWithoutNulls(String str, Schema schema) {
        StringBuilder sb = new StringBuilder(" WHERE ");
        for (String str2 : schema.getColumnNames()) {
            if (schema.getFieldSpecFor(str2).isSingleValueField()) {
                sb.append(str2).append(" IS NOT NULL AND ");
            }
        }
        ResultSetGroup execute = getPinotConnection().execute("SELECT COUNT(*) FROM " + str + new StringBuilder(sb.substring(0, sb.length() - 5)) + " OPTION(skipUpsert=true)");
        if (execute.getResultSetCount() > 0) {
            return execute.getResultSet(0).getLong(0);
        }
        return 0L;
    }
}
