package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.class */
public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
    private static String _tableNameSuffix;

    protected Map<String, String> getStreamConfigs() {
        return null;
    }

    protected String getSortedColumn() {
        return null;
    }

    protected List<String> getInvertedIndexColumns() {
        return null;
    }

    protected List<String> getNoDictionaryColumns() {
        return null;
    }

    protected List<String> getRangeIndexColumns() {
        return null;
    }

    protected List<String> getBloomFilterColumns() {
        return null;
    }

    @BeforeMethod
    public void setUpTest() throws IOException {
        _tableNameSuffix = RandomStringUtils.randomAlphabetic(12);
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
    }

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        startBroker();
        startServer();
    }

    @Test
    public void testUploadAndQuery() throws Exception {
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        waitForEVToDisappear(createOfflineTableConfig.getTableName());
        addTableConfig(createOfflineTableConfig);
        List allAvroFiles = getAllAvroFiles();
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(0), createOfflineTableConfig, createSchema, "_with_move", this._segmentDir, this._tarDir);
        SegmentMetadataPushJobRunner segmentMetadataPushJobRunner = new SegmentMetadataPushJobRunner();
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
        segmentGenerationJobSpec.setPinotFSSpecs(Lists.newArrayList(new PinotFSSpec[]{pinotFSSpec}));
        segmentGenerationJobSpec.setOutputDirURI(this._tarDir.getAbsolutePath());
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName(getTableName());
        tableSpec.setTableConfigURI(this._controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(getControllerBaseApiUrl());
        segmentGenerationJobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
        File file = new File(new File(this._controllerConfig.getDataDir()), getTableName());
        Assert.assertFalse(file.exists());
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        segmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        segmentMetadataPushJobRunner.run();
        Assert.assertTrue(file.exists());
        Assert.assertEquals(file.listFiles().length, 1);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        JsonNode segmentsList = getSegmentsList();
        Assert.assertEquals(segmentsList.size(), 1);
        String asText = segmentsList.get(0).asText();
        Assert.assertTrue(asText.endsWith("_with_move"));
        long numDocs = getNumDocs(asText);
        testCountStar(numDocs);
        for (File file2 : this._segmentDir.listFiles()) {
            FileUtils.deleteQuietly(file2);
        }
        for (File file3 : this._tarDir.listFiles()) {
            FileUtils.deleteQuietly(file3);
        }
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(1), createOfflineTableConfig, createSchema, "_without_move", this._segmentDir, this._tarDir);
        segmentGenerationJobSpec.setPushJobSpec(new PushJobSpec());
        SegmentMetadataPushJobRunner segmentMetadataPushJobRunner2 = new SegmentMetadataPushJobRunner();
        Assert.assertEquals(file.listFiles().length, 1);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        segmentMetadataPushJobRunner2.init(segmentGenerationJobSpec);
        segmentMetadataPushJobRunner2.run();
        Assert.assertEquals(file.listFiles().length, 1);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        JsonNode segmentsList2 = getSegmentsList();
        Assert.assertEquals(segmentsList2.size(), 2);
        String str = null;
        Iterator it = segmentsList2.iterator();
        while (it.hasNext()) {
            JsonNode jsonNode = (JsonNode) it.next();
            if (jsonNode.asText().endsWith("_without_move")) {
                str = jsonNode.asText();
            }
        }
        Assert.assertNotNull(str);
        testCountStar(numDocs + getNumDocs(str));
    }

    @Test
    public void testUploadMultipleSegmentsInBatchModeAndQuery() throws Exception {
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        waitForEVToDisappear(createOfflineTableConfig.getTableName());
        addTableConfig(createOfflineTableConfig);
        List allAvroFiles = getAllAvroFiles();
        for (int i = 0; i < 12; i++) {
            ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(i), createOfflineTableConfig, createSchema, "_seg" + i, this._segmentDir, this._tarDir);
        }
        SegmentMetadataPushJobRunner segmentMetadataPushJobRunner = new SegmentMetadataPushJobRunner();
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
        pushJobSpec.setBatchSegmentUpload(true);
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
        segmentGenerationJobSpec.setPinotFSSpecs(Lists.newArrayList(new PinotFSSpec[]{pinotFSSpec}));
        segmentGenerationJobSpec.setOutputDirURI(this._tarDir.getAbsolutePath());
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName(getTableName() + "_OFFLINE");
        tableSpec.setTableConfigURI(this._controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(getControllerBaseApiUrl());
        segmentGenerationJobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
        File file = new File(new File(this._controllerConfig.getDataDir()), getTableName());
        Assert.assertFalse(file.exists());
        Assert.assertEquals(this._tarDir.listFiles().length, 12);
        segmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        segmentMetadataPushJobRunner.run();
        Assert.assertTrue(file.exists());
        Assert.assertEquals(file.listFiles().length, 12);
        Assert.assertEquals(this._tarDir.listFiles().length, 12);
        JsonNode segmentsList = getSegmentsList();
        Assert.assertEquals(segmentsList.size(), 12);
        long j = 0;
        Iterator it = segmentsList.iterator();
        while (it.hasNext()) {
            j += getNumDocs(((JsonNode) it.next()).asText());
        }
        testCountStar(j);
        for (File file2 : this._segmentDir.listFiles()) {
            FileUtils.deleteQuietly(file2);
        }
        for (File file3 : this._tarDir.listFiles()) {
            FileUtils.deleteQuietly(file3);
        }
    }

    @Test
    public void testUploadAndQueryWithConsistentPush() throws Exception {
        Schema createSchema = createSchema();
        addSchema(createSchema);
        TableConfig createOfflineTableConfigWithConsistentPush = createOfflineTableConfigWithConsistentPush();
        waitForEVToDisappear(createOfflineTableConfigWithConsistentPush.getTableName());
        addTableConfig(createOfflineTableConfigWithConsistentPush);
        List allAvroFiles = getAllAvroFiles();
        String l = Long.toString(System.currentTimeMillis());
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(0), createOfflineTableConfigWithConsistentPush, createSchema, l, this._segmentDir, this._tarDir);
        SegmentMetadataPushJobRunner segmentMetadataPushJobRunner = new SegmentMetadataPushJobRunner();
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        PushJobSpec pushJobSpec = new PushJobSpec();
        pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
        segmentGenerationJobSpec.setPushJobSpec(pushJobSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
        segmentGenerationJobSpec.setPinotFSSpecs(Lists.newArrayList(new PinotFSSpec[]{pinotFSSpec}));
        segmentGenerationJobSpec.setOutputDirURI(this._tarDir.getAbsolutePath());
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName(getTableName());
        tableSpec.setTableConfigURI(this._controllerRequestURLBuilder.forUpdateTableConfig(getTableName()));
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
        pinotClusterSpec.setControllerURI(getControllerBaseApiUrl());
        segmentGenerationJobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
        File file = new File(new File(this._controllerConfig.getDataDir()), getTableName());
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        segmentMetadataPushJobRunner.init(segmentGenerationJobSpec);
        segmentMetadataPushJobRunner.run();
        Assert.assertTrue(file.exists());
        Assert.assertEquals(file.listFiles().length, 1);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        JsonNode segmentsList = getSegmentsList();
        Assert.assertEquals(segmentsList.size(), 1);
        String asText = segmentsList.get(0).asText();
        Assert.assertTrue(asText.endsWith(l));
        testCountStar(getNumDocs(asText));
        String sendGetRequest = ControllerTest.sendGetRequest(ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()).forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString()));
        Assert.assertTrue(sendGetRequest.contains("\"state\":\"COMPLETED\""));
        Assert.assertTrue(sendGetRequest.contains("\"segmentsFrom\":[]"));
        Assert.assertTrue(sendGetRequest.contains("\"segmentsTo\":[\"" + asText + "\"]"));
        for (File file2 : this._segmentDir.listFiles()) {
            FileUtils.deleteQuietly(file2);
        }
        for (File file3 : this._tarDir.listFiles()) {
            FileUtils.deleteQuietly(file3);
        }
        String l2 = Long.toString(System.currentTimeMillis());
        ClusterIntegrationTestUtils.buildSegmentFromAvro((File) allAvroFiles.get(1), createOfflineTableConfigWithConsistentPush, createSchema, l2, this._segmentDir, this._tarDir);
        segmentGenerationJobSpec.setPushJobSpec(new PushJobSpec());
        SegmentTarPushJobRunner segmentTarPushJobRunner = new SegmentTarPushJobRunner();
        Assert.assertEquals(file.listFiles().length, 1);
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        segmentTarPushJobRunner.init(segmentGenerationJobSpec);
        segmentTarPushJobRunner.run();
        Assert.assertEquals(this._tarDir.listFiles().length, 1);
        JsonNode segmentsList2 = getSegmentsList();
        Assert.assertEquals(segmentsList2.size(), 2);
        String str = null;
        Iterator it = segmentsList2.iterator();
        while (it.hasNext()) {
            JsonNode jsonNode = (JsonNode) it.next();
            if (jsonNode.asText().endsWith(l2)) {
                str = jsonNode.asText();
            }
        }
        Assert.assertNotNull(str);
        testCountStar(getNumDocs(str));
        String sendGetRequest2 = ControllerTest.sendGetRequest(ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()).forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString()));
        Assert.assertTrue(sendGetRequest2.contains("\"state\":\"COMPLETED\""));
        Assert.assertTrue(sendGetRequest2.contains("\"segmentsFrom\":[\"" + asText + "\"]"));
        Assert.assertTrue(sendGetRequest2.contains("\"segmentsTo\":[\"" + str + "\"]"));
    }

    protected TableConfig createOfflineTableConfigWithConsistentPush() {
        TableConfig createOfflineTableConfig = createOfflineTableConfig();
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig((List) null, "REFRESH", "DAILY", true));
        createOfflineTableConfig.setIngestionConfig(ingestionConfig);
        return createOfflineTableConfig;
    }

    private long getNumDocs(String str) throws IOException {
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentMetadata(getTableName(), str))).get("segment.total.docs").asLong();
    }

    private JsonNode getSegmentsList() throws IOException {
        return JsonUtils.stringToJsonNode(sendGetRequest(this._controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()))).get(0).get("OFFLINE");
    }

    protected void testCountStar(final long j) {
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentUploadIntegrationTest.1
            @Nullable
            public Boolean apply(@Nullable Void r6) {
                try {
                    return Boolean.valueOf(SegmentUploadIntegrationTest.this.getCurrentCountStarResult() == j);
                } catch (Exception e) {
                    return null;
                }
            }
        }, 100L, 300000L, "Failed to load " + j + " documents", true);
    }

    public String getTableName() {
        return "mytable" + _tableNameSuffix;
    }

    @AfterMethod
    public void tearDownTest() throws IOException {
        dropOfflineTable(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()));
    }

    @AfterClass
    public void tearDown() throws Exception {
        stopServer();
        stopBroker();
        stopController();
        stopZk();
    }
}
