package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.apache.spark.SparkContext;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SparkSegmentMetadataPushIntegrationTest.class */
public class SparkSegmentMetadataPushIntegrationTest extends BaseClusterIntegrationTest {
    private SparkContext _sparkContext;
    private final String _testTable = "mytable";
    private final String _testTableWithType = "mytable_OFFLINE";

    protected Map<String, String> getStreamConfigs() {
        return null;
    }

    protected String getSortedColumn() {
        return null;
    }

    protected List<String> getInvertedIndexColumns() {
        return null;
    }

    protected List<String> getNoDictionaryColumns() {
        return null;
    }

    protected List<String> getRangeIndexColumns() {
        return null;
    }

    protected List<String> getBloomFilterColumns() {
        return null;
    }

    @BeforeMethod
    public void setUpTest() throws IOException {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
    }

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        startBroker();
        startServer();
        this._sparkContext = new SparkContext("local", SparkSegmentMetadataPushIntegrationTest.class.getName());
    }

    @Test
    public void testSparkSegmentMetadataPushWithoutConsistentPush() throws Exception {
        runMetadataPushWithoutConsistentPushTest(false);
    }

    @Test
    public void testSparkSegmentMetadataPushWithoutConsistentPushWithBatchSegmentUpload() throws Exception {
        runMetadataPushWithoutConsistentPushTest(true);
    }

    private void runMetadataPushWithoutConsistentPushTest(boolean z) throws Exception {
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        waitForEVToDisappear(createOfflineTableConfig.getTableName());
        addTableConfig(createOfflineTableConfig);
        List allAvroFiles = getAllAvroFiles();
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(0), createOfflineTableConfig, createSchema, "_no_consistent_push", this._segmentDir, this._tarDir);
        SparkSegmentMetadataPushJobRunner sparkSegmentMetadataPushJobRunner = new SparkSegmentMetadataPushJobRunner();
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentMetadataPush");
        segmentGenerationJobSpec.setInputDirURI(((File) allAvroFiles.get(0)).getParent());
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setPushParallelism(5);
        pushJobSpec.setPushAttempts(1);
        pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
        pushJobSpec.setBatchSegmentUpload(z);
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
        segmentGenerationJobSpec.setPinotFSSpecs(Lists.newArrayList(new PinotFSSpec[]{pinotFSSpec}));
        segmentGenerationJobSpec.setOutputDirURI(this._tarDir.getAbsolutePath());
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("mytable_OFFLINE");
        tableSpec.setTableConfigURI(this._controllerRequestURLBuilder.forUpdateTableConfig("mytable_OFFLINE"));
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(getControllerBaseApiUrl());
        segmentGenerationJobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
        sparkSegmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        sparkSegmentMetadataPushJobRunner.run();
        JsonNode segmentsList = getSegmentsList();
        Assert.assertEquals(segmentsList.size(), 1);
        String asText = segmentsList.get(0).asText();
        Assert.assertTrue(asText.endsWith("_no_consistent_push"));
        testCountStar(getNumDocs(asText));
    }

    @Test
    public void testSparkSegmentMetadataPushWithConsistentPushParallelism1() throws Exception {
        runMetadataPushWithConsistentDataPushTest(5, 1, false);
    }

    @Test
    public void testSparkSegmentMetadataPushWithConsistentPushParallelism1WithBatchSegmentUpload() throws Exception {
        runMetadataPushWithConsistentDataPushTest(5, 1, true);
    }

    @Test
    public void testSparkSegmentMetadataPushWithConsistentPushParallelism5() throws Exception {
        runMetadataPushWithConsistentDataPushTest(5, 5, false);
    }

    @Test
    public void testSparkSegmentMetadataPushWithConsistentPushParallelism5WithBatchSegmentUpload() throws Exception {
        runMetadataPushWithConsistentDataPushTest(5, 5, true);
    }

    @Test
    public void testSparkSegmentMetadataPushWithConsistentPushHigherParallelismThenSegments() throws Exception {
        runMetadataPushWithConsistentDataPushTest(1, 5, false);
    }

    private void runMetadataPushWithConsistentDataPushTest(int i, int i2, boolean z) throws Exception {
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfigWithConsistentPush = createOfflineTableConfigWithConsistentPush();
        waitForEVToDisappear(createOfflineTableConfigWithConsistentPush.getTableName());
        addTableConfig(createOfflineTableConfigWithConsistentPush);
        List allAvroFiles = getAllAvroFiles();
        String l = Long.toString(System.currentTimeMillis());
        for (int i3 = 0; i3 < i; i3++) {
            ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(i3), createOfflineTableConfigWithConsistentPush, createSchema, l, this._segmentDir, this._tarDir);
        }
        SparkSegmentMetadataPushJobRunner sparkSegmentMetadataPushJobRunner = new SparkSegmentMetadataPushJobRunner();
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentMetadataPush");
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setPushParallelism(i2);
        pushJobSpec.setPushAttempts(1);
        pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
        pushJobSpec.setBatchSegmentUpload(z);
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
        segmentGenerationJobSpec.setPinotFSSpecs(Lists.newArrayList(new PinotFSSpec[]{pinotFSSpec}));
        segmentGenerationJobSpec.setOutputDirURI(this._tarDir.getAbsolutePath());
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("mytable_OFFLINE");
        tableSpec.setTableConfigURI(this._controllerRequestURLBuilder.forUpdateTableConfig("mytable_OFFLINE"));
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(getControllerBaseApiUrl());
        segmentGenerationJobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
        sparkSegmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        sparkSegmentMetadataPushJobRunner.run();
        JsonNode segmentsList = getSegmentsList();
        Assert.assertEquals(segmentsList.size(), i);
        long j = 0;
        for (int i4 = 0; i4 < i; i4++) {
            String asText = segmentsList.get(i4).asText();
            Assert.assertTrue(asText.endsWith(l));
            j += getNumDocs(asText);
        }
        testCountStar(j);
        String sendGetRequest = sendGetRequest(ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()).forListAllSegmentLineages("mytable_OFFLINE", TableType.OFFLINE.toString()));
        Assert.assertTrue(sendGetRequest.contains("\"state\":\"COMPLETED\""));
        Assert.assertTrue(sendGetRequest.contains("\"segmentsFrom\":[]"));
        String extractSegmentsFromLineageKey = extractSegmentsFromLineageKey("segmentsTo", sendGetRequest);
        for (int i5 = 0; i5 < i; i5++) {
            Assert.assertTrue(extractSegmentsFromLineageKey.contains(segmentsList.get(i5).asText()));
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (int i6 = 0; i6 < i; i6++) {
            newArrayList.add(segmentsList.get(i6).asText());
        }
        for (File file : this._tarDir.listFiles()) {
            FileUtils.deleteQuietly(file);
        }
        String l2 = Long.toString(System.currentTimeMillis());
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(i), createOfflineTableConfigWithConsistentPush, createSchema, l2, this._segmentDir, this._tarDir);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        sparkSegmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        sparkSegmentMetadataPushJobRunner.run();
        JsonNode segmentsList2 = getSegmentsList();
        Assert.assertEquals(segmentsList2.size(), i + 1);
        String asText2 = segmentsList2.get(i).asText();
        Assert.assertTrue(asText2.endsWith(l2));
        testCountStar(getNumDocs(asText2));
    }

    private static String extractSegmentsFromLineageKey(String str, String str2) {
        int length;
        int indexOf;
        String str3 = "\"" + str + "\":[";
        int indexOf2 = str2.indexOf(str3);
        return (indexOf2 == -1 || (indexOf = str2.indexOf(93, (length = indexOf2 + str3.length()))) == -1 || length >= indexOf) ? "" : str2.substring(length, indexOf);
    }

    protected TableConfig createOfflineTableConfigWithConsistentPush() {
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig((List) null, "REFRESH", "DAILY", true));
        createOfflineTableConfig.setIngestionConfig(ingestionConfig);
        return createOfflineTableConfig;
    }

    private long getNumDocs(String str) throws IOException {
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentMetadata("mytable", str))).get("segment.total.docs").asLong();
    }

    private JsonNode getSegmentsList() throws IOException {
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPI("mytable", TableType.OFFLINE.toString()))).get(0).get("OFFLINE");
    }

    protected void testCountStar(long j) {
        TestUtils.waitForCondition(r8 -> {
            try {
                return Boolean.valueOf(getCurrentCountStarResult() == j);
            } catch (Exception e) {
                return null;
            }
        }, 100L, 300000L, "Failed to load " + j + " documents", true);
    }

    @AfterMethod
    public void tearDownTest() throws IOException {
        dropOfflineTable(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()));
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
    }

    @AfterClass
    public void tearDown() throws Exception {
        this._sparkContext.stop();
        stopServer();
        stopBroker();
        stopController();
        stopZk();
    }
}
