package org.apache.pinot.plugin.segmentwriter.filebased;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.class */
public class FileBasedSegmentWriter implements SegmentWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) FileBasedSegmentWriter.class);
    private static final FileFormat BUFFER_FILE_FORMAT = FileFormat.AVRO;
    private TableConfig _tableConfig;
    private String _tableNameWithType;
    private BatchIngestionConfig _batchIngestionConfig;
    private BatchConfig _batchConfig;
    private String _outputDirURI;
    private Schema _schema;
    private Set<String> _fieldsToRead;
    private RecordTransformer _recordTransformer;
    private File _stagingDir;
    private File _bufferFile;
    private org.apache.avro.Schema _avroSchema;
    private DataFileWriter<GenericData.Record> _recordWriter;
    private GenericData.Record _reusableRecord;

    @Override // org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter
    public void init(TableConfig tableConfig, Schema schema) throws Exception {
        init(tableConfig, schema, Collections.emptyMap());
    }

    @Override // org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter
    public void init(TableConfig tableConfig, Schema schema, Map<String, String> map) throws Exception {
        this._tableConfig = tableConfig;
        this._tableNameWithType = this._tableConfig.getTableName();
        Preconditions.checkState((this._tableConfig.getIngestionConfig() == null || this._tableConfig.getIngestionConfig().getBatchIngestionConfig() == null || !CollectionUtils.isNotEmpty(this._tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps())) ? false : true, "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps in tableConfig for table: %s", this._tableNameWithType);
        this._batchIngestionConfig = this._tableConfig.getIngestionConfig().getBatchIngestionConfig();
        Preconditions.checkState(this._batchIngestionConfig.getBatchConfigMaps().size() == 1, "batchConfigMaps must contain only 1 BatchConfig for table: %s", this._tableNameWithType);
        HashMap hashMap = new HashMap(this._batchIngestionConfig.getBatchConfigMaps().get(0));
        hashMap.putAll(map);
        this._batchConfig = new BatchConfig(this._tableNameWithType, hashMap);
        Preconditions.checkState(StringUtils.isNotBlank(this._batchConfig.getOutputDirURI()), "Must provide: %s in batchConfigs for table: %s", BatchConfigProperties.OUTPUT_DIR_URI, this._tableNameWithType);
        this._outputDirURI = this._batchConfig.getOutputDirURI();
        Files.createDirectories(Paths.get(this._outputDirURI, new String[0]), new FileAttribute[0]);
        this._schema = schema;
        this._fieldsToRead = this._schema.getColumnNames();
        this._recordTransformer = CompositeTransformer.getDefaultTransformer(this._tableConfig, this._schema);
        this._avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(this._schema);
        this._reusableRecord = new GenericData.Record(this._avroSchema);
        this._stagingDir = new File(FileUtils.getTempDirectory(), String.format("segment_writer_staging_%s_%d", this._tableNameWithType, Long.valueOf(System.currentTimeMillis())));
        Preconditions.checkState(this._stagingDir.mkdirs(), "Failed to create staging dir: %s", this._stagingDir.getAbsolutePath());
        File file = new File(this._stagingDir, "buffer_dir");
        Preconditions.checkState(file.mkdirs(), "Failed to create buffer_dir: %s", file.getAbsolutePath());
        this._bufferFile = new File(file, "buffer_file");
        resetBuffer();
        LOGGER.info("Initialized {} for table: {}", FileBasedSegmentWriter.class.getName(), this._tableNameWithType);
    }

    private void resetBuffer() throws IOException {
        FileUtils.deleteQuietly(this._bufferFile);
        this._recordWriter = new DataFileWriter<>(new GenericDatumWriter(this._avroSchema));
        this._recordWriter.create(this._avroSchema, this._bufferFile);
    }

    @Override // org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter
    public void collect(GenericRow genericRow) throws IOException {
        SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(this._recordTransformer.transform(genericRow), this._reusableRecord, this._fieldsToRead);
        this._recordWriter.append(this._reusableRecord);
    }

    @Override // org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter
    public URI flush() throws IOException {
        LOGGER.info("Beginning flush for table: {}", this._tableNameWithType);
        this._recordWriter.close();
        File file = new File(this._stagingDir, "flush_dir_" + System.currentTimeMillis());
        Preconditions.checkState(file.mkdirs(), "Failed to create flush dir: %s", file);
        try {
            try {
                File file2 = new File(file, "segment_dir");
                HashMap hashMap = new HashMap(this._batchConfig.getBatchConfigMap());
                hashMap.put(BatchConfigProperties.INPUT_DIR_URI, this._bufferFile.getAbsolutePath());
                hashMap.put(BatchConfigProperties.OUTPUT_DIR_URI, file2.getAbsolutePath());
                hashMap.put(BatchConfigProperties.INPUT_FORMAT, BUFFER_FILE_FORMAT.toString());
                String buildSegment = IngestionUtils.buildSegment(IngestionUtils.generateSegmentGeneratorConfig(this._tableConfig, this._schema, new BatchIngestionConfig(Lists.newArrayList(hashMap), this._batchIngestionConfig.getSegmentIngestionType(), this._batchIngestionConfig.getSegmentIngestionFrequency(), this._batchIngestionConfig.getConsistentDataPush())));
                LOGGER.info("Successfully built segment: {} for table: {}", buildSegment, this._tableNameWithType);
                File file3 = new File(this._outputDirURI, buildSegment + ".tar.gz");
                if (file3.exists()) {
                    if (!this._batchConfig.isOverwriteOutput()) {
                        throw new IllegalArgumentException(String.format("Duplicate segment name generated '%s' in '%s', please adjust segment name generator config to avoid duplicates, or allow batch config overwrite", buildSegment, this._outputDirURI));
                    }
                    LOGGER.warn(String.format("Duplicate segment name detected '%s' in file '%s', deleting old segment", buildSegment, file2));
                    if (file3.delete()) {
                        LOGGER.warn(String.format("Segment file deleted: '%s/%s'", this._outputDirURI, buildSegment));
                    }
                }
                TarGzCompressionUtils.createTarGzFile(new File(file2, buildSegment), file3);
                LOGGER.info("Created segment tar: {} for segment: {} of table: {}", file3.getAbsolutePath(), buildSegment, this._tableNameWithType);
                resetBuffer();
                URI uri = file3.toURI();
                FileUtils.deleteQuietly(file);
                return uri;
            } catch (Exception e) {
                throw new RuntimeException(String.format("Caught exception while generating segment from buffer file: %s for table:%s", this._bufferFile.getAbsolutePath(), this._tableNameWithType), e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOGGER.info("Closing {} for table: {}", FileBasedSegmentWriter.class.getName(), this._tableNameWithType);
        this._recordWriter.close();
        FileUtils.deleteQuietly(this._stagingDir);
    }
}
