package org.apache.pinot.plugin.ingestion.batch.spark3;

import com.google.common.collect.Lists;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.spark.SparkContext;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunnerTest.class */
public class SparkSegmentGenerationJobRunnerTest {
    private SparkContext _sparkContext;

    @BeforeClass
    public void setup() {
        this._sparkContext = new SparkContext("local", SparkSegmentGenerationJobRunnerTest.class.getName());
    }

    private SegmentGenerationJobSpec setupAppendTableSpec(File file) throws Exception {
        File file2 = new File(file, "input");
        file2.mkdirs();
        FileUtils.writeLines(new File(file2, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value1,1", "value2,2"}));
        File file3 = new File(file, "output");
        FileUtils.touch(new File(file3, "myTable_OFFLINE_0.tar.gz"));
        FileUtils.touch(new File(file3, "myTable_OFFLINE_100.tar.gz"));
        File file4 = new File(file, "myTable.schema");
        FileUtils.write(file4, new Schema.SchemaBuilder().setSchemaName("myTable").addSingleValueDimension("col1", FieldSpec.DataType.STRING).addMetric("col2", FieldSpec.DataType.INT).build().toPrettyJsonString(), StandardCharsets.UTF_8);
        File file5 = new File(file, "myTable.table");
        FileUtils.write(file5, new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(1).build().toJsonString(), StandardCharsets.UTF_8);
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentCreation");
        segmentGenerationJobSpec.setInputDirURI(file2.toURI().toString());
        segmentGenerationJobSpec.setOutputDirURI(file3.toURI().toString());
        segmentGenerationJobSpec.setOverwriteOutput(false);
        RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
        recordReaderSpec.setDataFormat("csv");
        recordReaderSpec.setClassName(CSVRecordReader.class.getName());
        recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
        segmentGenerationJobSpec.setRecordReaderSpec(recordReaderSpec);
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("myTable");
        tableSpec.setSchemaURI(file4.toURI().toString());
        tableSpec.setTableConfigURI(file5.toURI().toString());
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
        executionFrameworkSpec.setName("standalone");
        executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(SparkSegmentGenerationJobRunner.class.getName());
        segmentGenerationJobSpec.setExecutionFrameworkSpec(executionFrameworkSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName(LocalPinotFS.class.getName());
        segmentGenerationJobSpec.setPinotFSSpecs(Collections.singletonList(pinotFSSpec));
        return segmentGenerationJobSpec;
    }

    private SegmentGenerationJobSpec setupRefreshTableSpec(File file) throws Exception {
        File file2 = new File(file, "input");
        file2.mkdirs();
        File file3 = new File(file2, "input1.csv");
        File file4 = new File(file2, "input2.csv");
        FileUtils.writeLines(file3, Lists.newArrayList(new String[]{"col1,col2", "value1,1", "value2,2"}));
        FileUtils.writeLines(file4, Lists.newArrayList(new String[]{"col1,col2", "value3,3", "value4,4"}));
        File file5 = new File(file, "output");
        File file6 = new File(file, "myTable.schema");
        FileUtils.write(file6, new Schema.SchemaBuilder().setSchemaName("myTable").addSingleValueDimension("col1", FieldSpec.DataType.STRING).addMetric("col2", FieldSpec.DataType.INT).build().toPrettyJsonString(), StandardCharsets.UTF_8);
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig((List) null, "REFRESH", "DAILY", true));
        File file7 = new File(file, "myTable.table");
        FileUtils.write(file7, new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(1).setIngestionConfig(ingestionConfig).build().toJsonString(), StandardCharsets.UTF_8);
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentCreation");
        segmentGenerationJobSpec.setInputDirURI(file2.toURI().toString());
        segmentGenerationJobSpec.setOutputDirURI(file5.toURI().toString());
        segmentGenerationJobSpec.setOverwriteOutput(false);
        RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
        recordReaderSpec.setDataFormat("csv");
        recordReaderSpec.setClassName(CSVRecordReader.class.getName());
        recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
        segmentGenerationJobSpec.setRecordReaderSpec(recordReaderSpec);
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("myTable");
        tableSpec.setSchemaURI(file6.toURI().toString());
        tableSpec.setTableConfigURI(file7.toURI().toString());
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
        executionFrameworkSpec.setName("standalone");
        executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(SparkSegmentGenerationJobRunner.class.getName());
        segmentGenerationJobSpec.setExecutionFrameworkSpec(executionFrameworkSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName(LocalPinotFS.class.getName());
        segmentGenerationJobSpec.setPinotFSSpecs(Collections.singletonList(pinotFSSpec));
        return segmentGenerationJobSpec;
    }

    @Test
    public void testSegmentGeneration() throws Exception {
        File file = Files.createTempDirectory("testSegmentGeneration-", new FileAttribute[0]).toFile();
        file.delete();
        file.mkdirs();
        SegmentGenerationJobSpec segmentGenerationJobSpec = setupAppendTableSpec(file);
        new SparkSegmentGenerationJobRunner(segmentGenerationJobSpec).run();
        File file2 = new File(file, "output");
        File file3 = new File(file2, "myTable_OFFLINE_100.tar.gz");
        Assert.assertTrue(file3.exists());
        File file4 = new File(file2, "myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file4.exists());
        Assert.assertTrue(file4.isFile());
        Assert.assertTrue(file4.length() == 0);
        segmentGenerationJobSpec.setOverwriteOutput(true);
        new SparkSegmentGenerationJobRunner(segmentGenerationJobSpec).run();
        Assert.assertTrue(file3.exists());
        Assert.assertTrue(file4.exists());
        Assert.assertTrue(file4.isFile());
        Assert.assertTrue(file4.length() > 0);
    }

    @Test
    public void testSegmentGenerationWithConsistentPush() throws Exception {
        File file = Files.createTempDirectory("testSegmentGenerationWithConsistentPush-", new FileAttribute[0]).toFile();
        file.delete();
        file.mkdirs();
        SegmentGenerationJobSpec segmentGenerationJobSpec = setupRefreshTableSpec(file);
        new SparkSegmentGenerationJobRunner(segmentGenerationJobSpec).run();
        Assert.assertNotNull(segmentGenerationJobSpec.getSegmentNameGeneratorSpec());
        Assert.assertEquals(segmentGenerationJobSpec.getSegmentNameGeneratorSpec().getConfigs().keySet().size(), 1);
        Assert.assertTrue(segmentGenerationJobSpec.getSegmentNameGeneratorSpec().getConfigs().containsKey("segment.name.postfix"));
        Assert.assertTrue(((String) segmentGenerationJobSpec.getSegmentNameGeneratorSpec().getConfigs().get("segment.name.postfix")).matches("\\d+"));
        String str = (String) segmentGenerationJobSpec.getSegmentNameGeneratorSpec().getConfigs().get("segment.name.postfix");
        Assert.assertEquals((Set) Arrays.stream(new File(file, "output").listFiles()).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet()), Set.of("myTable_OFFLINE_" + str + "_0.tar.gz", "myTable_OFFLINE_" + str + "_1.tar.gz"));
    }

    @Test
    public void testInputFilesWithSameNameInDifferentDirectories() throws Exception {
        File file = Files.createTempDirectory("testSegmentGeneration-", new FileAttribute[0]).toFile();
        file.delete();
        file.mkdirs();
        File file2 = new File(file, "input");
        File file3 = new File(file2, "2009");
        File file4 = new File(file2, "2010");
        file3.mkdirs();
        file4.mkdirs();
        FileUtils.writeLines(new File(file3, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value1,1", "value2,2"}));
        FileUtils.writeLines(new File(file4, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value3,3", "value4,4"}));
        File file5 = new File(file, "output");
        File file6 = new File(file, "schema");
        FileUtils.write(file6, new Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("col1", FieldSpec.DataType.STRING).addMetric("col2", FieldSpec.DataType.INT).build().toPrettyJsonString(), StandardCharsets.UTF_8);
        File file7 = new File(file, "tableConfig");
        FileUtils.write(file7, new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setNumReplicas(1).build().toJsonString(), StandardCharsets.UTF_8);
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentCreation");
        segmentGenerationJobSpec.setInputDirURI(file2.toURI().toString());
        segmentGenerationJobSpec.setSearchRecursively(true);
        segmentGenerationJobSpec.setOutputDirURI(file5.toURI().toString());
        segmentGenerationJobSpec.setOverwriteOutput(true);
        RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
        recordReaderSpec.setDataFormat("csv");
        recordReaderSpec.setClassName(CSVRecordReader.class.getName());
        recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
        segmentGenerationJobSpec.setRecordReaderSpec(recordReaderSpec);
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("myTable");
        tableSpec.setSchemaURI(file6.toURI().toString());
        tableSpec.setTableConfigURI(file7.toURI().toString());
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
        executionFrameworkSpec.setName("standalone");
        executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(SparkSegmentGenerationJobRunner.class.getName());
        segmentGenerationJobSpec.setExecutionFrameworkSpec(executionFrameworkSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName(LocalPinotFS.class.getName());
        segmentGenerationJobSpec.setPinotFSSpecs(Collections.singletonList(pinotFSSpec));
        new SparkSegmentGenerationJobRunner(segmentGenerationJobSpec).run();
        File file8 = new File(file5, "2009/myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file8.exists());
        Assert.assertTrue(file8.isFile());
        Assert.assertTrue(file8.length() > 0);
        File file9 = new File(file5, "2010/myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file9.exists());
        Assert.assertTrue(file9.isFile());
        Assert.assertTrue(file9.length() > 0);
    }
}
