package org.apache.pinot.plugin.ingestion.batch.standalone;

import com.google.common.collect.Lists;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.RecordReaderSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunnerTest.class */
public class SegmentGenerationJobRunnerTest {
    @Test
    public void testSegmentGeneration() throws Exception {
        File makeTestDir = makeTestDir();
        File file = new File(makeTestDir, "input");
        file.mkdirs();
        FileUtils.writeLines(new File(file, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value1,1", "value2,2"}));
        File file2 = new File(makeTestDir, "output");
        FileUtils.touch(new File(file2, "myTable_OFFLINE_0.tar.gz"));
        FileUtils.touch(new File(file2, "myTable_OFFLINE_100.tar.gz"));
        SegmentGenerationJobSpec makeJobSpec = makeJobSpec(file, file2, makeSchemaFile(makeTestDir, "mySchema"), makeTableConfigFile(makeTestDir, "mySchema"));
        makeJobSpec.setOverwriteOutput(false);
        new SegmentGenerationJobRunner(makeJobSpec).run();
        File file3 = new File(file2, "myTable_OFFLINE_100.tar.gz");
        Assert.assertTrue(file3.exists());
        File file4 = new File(file2, "myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file4.exists());
        Assert.assertTrue(file4.isFile());
        Assert.assertTrue(file4.length() == 0);
        makeJobSpec.setOverwriteOutput(true);
        new SegmentGenerationJobRunner(makeJobSpec).run();
        Assert.assertTrue(file3.exists());
        Assert.assertTrue(file4.exists());
        Assert.assertTrue(file4.isFile());
        Assert.assertTrue(file4.length() > 0);
    }

    @Test
    public void testInputFilesWithSameNameInDifferentDirectories() throws Exception {
        File makeTestDir = makeTestDir();
        File file = new File(makeTestDir, "input");
        File file2 = new File(file, "2009");
        File file3 = new File(file, "2010");
        file2.mkdirs();
        file3.mkdirs();
        FileUtils.writeLines(new File(file2, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value1,1", "value2,2"}));
        FileUtils.writeLines(new File(file3, "input.csv"), Lists.newArrayList(new String[]{"col1,col2", "value3,3", "value4,4"}));
        File file4 = new File(makeTestDir, "output");
        SegmentGenerationJobSpec makeJobSpec = makeJobSpec(file, file4, makeSchemaFile(makeTestDir, "mySchema"), makeTableConfigFile(makeTestDir, "mySchema"));
        makeJobSpec.setSearchRecursively(true);
        new SegmentGenerationJobRunner(makeJobSpec).run();
        File file5 = new File(file4, "2009/myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file5.exists());
        Assert.assertTrue(file5.isFile());
        Assert.assertTrue(file5.length() > 0);
        File file6 = new File(file4, "2010/myTable_OFFLINE_0.tar.gz");
        Assert.assertTrue(file6.exists());
        Assert.assertTrue(file6.isFile());
        Assert.assertTrue(file6.length() > 0);
    }

    @Test
    public void testFailureHandling() throws Exception {
        File makeTestDir = makeTestDir();
        File file = new File(makeTestDir, "input");
        file.mkdirs();
        FileUtils.writeLines(new File(file, "input1.csv"), Lists.newArrayList(new String[]{"col1,col2", "value11,11", "value12,12"}));
        FileUtils.writeLines(new File(file, "input2.csv"), Lists.newArrayList(new String[]{"col1,col2", "value21,notanint", "value22,22"}));
        FileUtils.writeLines(new File(file, "input3.csv"), Lists.newArrayList(new String[]{"col1,col2", "value31,31", "value32,32"}));
        File file2 = new File(makeTestDir, "output");
        SegmentGenerationJobSpec makeJobSpec = makeJobSpec(file, file2, makeSchemaFile(makeTestDir, "mySchema"), makeTableConfigFile(makeTestDir, "mySchema"));
        SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
        segmentNameGeneratorSpec.setType("inputFile");
        segmentNameGeneratorSpec.getConfigs().put("file.path.pattern", ".+/(.+)\\.csv");
        segmentNameGeneratorSpec.getConfigs().put("segment.name.template", "${filePathPattern:\\1}");
        makeJobSpec.setSegmentNameGeneratorSpec(segmentNameGeneratorSpec);
        try {
            new SegmentGenerationJobRunner(makeJobSpec).run();
            Assert.fail("Job should have failed");
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("input2.csv"), "Didn't find filename in exception message");
            File[] listFiles = file2.listFiles(new FilenameFilter() { // from class: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunnerTest.1
                @Override // java.io.FilenameFilter
                public boolean accept(File file3, String str) {
                    return str.endsWith(".tar.gz");
                }
            });
            Assert.assertEquals(listFiles.length, 1);
            Assert.assertTrue(listFiles[0].getName().endsWith("input1.tar.gz"));
        }
    }

    private File makeTestDir() throws IOException {
        File file = Files.createTempDirectory("testSegmentGeneration-", new FileAttribute[0]).toFile();
        file.delete();
        file.mkdirs();
        return file;
    }

    private File makeSchemaFile(File file, String str) throws IOException {
        File file2 = new File(file, "schema");
        FileUtils.write(file2, new Schema.SchemaBuilder().setSchemaName(str).addSingleValueDimension("col1", FieldSpec.DataType.STRING).addMetric("col2", FieldSpec.DataType.INT).build().toPrettyJsonString(), StandardCharsets.UTF_8);
        return file2;
    }

    private File makeTableConfigFile(File file, String str) throws IOException {
        File file2 = new File(file, "tableConfig");
        FileUtils.write(file2, new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setSchemaName(str).setNumReplicas(1).build().toJsonString(), StandardCharsets.UTF_8);
        return file2;
    }

    private SegmentGenerationJobSpec makeJobSpec(File file, File file2, File file3, File file4) {
        SegmentGenerationJobSpec segmentGenerationJobSpec = new SegmentGenerationJobSpec();
        segmentGenerationJobSpec.setJobType("SegmentCreation");
        segmentGenerationJobSpec.setInputDirURI(file.toURI().toString());
        segmentGenerationJobSpec.setOutputDirURI(file2.toURI().toString());
        segmentGenerationJobSpec.setOverwriteOutput(true);
        RecordReaderSpec recordReaderSpec = new RecordReaderSpec();
        recordReaderSpec.setDataFormat("csv");
        recordReaderSpec.setClassName(CSVRecordReader.class.getName());
        recordReaderSpec.setConfigClassName(CSVRecordReaderConfig.class.getName());
        segmentGenerationJobSpec.setRecordReaderSpec(recordReaderSpec);
        TableSpec tableSpec = new TableSpec();
        tableSpec.setTableName("myTable");
        tableSpec.setSchemaURI(file3.toURI().toString());
        tableSpec.setTableConfigURI(file4.toURI().toString());
        segmentGenerationJobSpec.setTableSpec(tableSpec);
        ExecutionFrameworkSpec executionFrameworkSpec = new ExecutionFrameworkSpec();
        executionFrameworkSpec.setName("standalone");
        executionFrameworkSpec.setSegmentGenerationJobRunnerClassName(SegmentGenerationJobRunner.class.getName());
        segmentGenerationJobSpec.setExecutionFrameworkSpec(executionFrameworkSpec);
        PinotFSSpec pinotFSSpec = new PinotFSSpec();
        pinotFSSpec.setScheme("file");
        pinotFSSpec.setClassName(LocalPinotFS.class.getName());
        segmentGenerationJobSpec.setPinotFSSpecs(Collections.singletonList(pinotFSSpec));
        return segmentGenerationJobSpec;
    }
}
