package org.apache.pinot.connector.flink.sink;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.pinot.connector.flink.common.PinotGenericRowConverter;
import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/connector/flink/sink/PinotSinkFunction.class */
public class PinotSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
    public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
    public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(PinotSinkFunction.class);
    private final long _segmentFlushMaxNumRecords;
    private final int _executorPoolSize;
    private final PinotGenericRowConverter<T> _recordConverter;
    private final TableConfig _tableConfig;
    private final Schema _schema;

    @Nullable
    private final String _segmentNamePrefix;

    @Nullable
    private final Long _segmentUploadTimeMs;
    private transient SegmentWriter _segmentWriter;
    private transient SegmentUploader _segmentUploader;
    private transient ExecutorService _executor;
    private transient long _segmentNumRecord;

    public PinotSinkFunction(PinotGenericRowConverter<T> pinotGenericRowConverter, TableConfig tableConfig, Schema schema) {
        this(pinotGenericRowConverter, tableConfig, schema, DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, 5);
    }

    public PinotSinkFunction(PinotGenericRowConverter<T> pinotGenericRowConverter, TableConfig tableConfig, Schema schema, long j, int i) {
        this(pinotGenericRowConverter, tableConfig, schema, j, i, null, null);
    }

    public PinotSinkFunction(PinotGenericRowConverter<T> pinotGenericRowConverter, TableConfig tableConfig, Schema schema, long j, int i, @Nullable String str, @Nullable Long l) {
        this._recordConverter = pinotGenericRowConverter;
        this._tableConfig = tableConfig;
        this._schema = schema;
        this._segmentFlushMaxNumRecords = j;
        this._executorPoolSize = i;
        this._segmentNamePrefix = str;
        this._segmentUploadTimeMs = l;
    }

    public void open(Configuration configuration) throws Exception {
        this._segmentWriter = new FlinkSegmentWriter(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMetricGroup(), this._segmentNamePrefix, this._segmentUploadTimeMs);
        this._segmentWriter.init(this._tableConfig, this._schema);
        this._segmentUploader = new SegmentUploaderDefault();
        this._segmentUploader.init(this._tableConfig);
        this._segmentNumRecord = 0L;
        this._executor = Executors.newFixedThreadPool(this._executorPoolSize);
        LOG.info("Open Pinot Sink with the table {}", this._tableConfig.toJsonString());
    }

    public void close() throws Exception {
        LOG.info("Closing Pinot Sink");
        try {
            if (this._segmentNumRecord > 0) {
                flush();
            }
        } catch (Exception e) {
            LOG.error("Error when closing Pinot sink", e);
        }
        this._executor.shutdown();
        try {
            if (!this._executor.awaitTermination(DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) {
                this._executor.shutdownNow();
            }
        } catch (InterruptedException e2) {
            this._executor.shutdownNow();
        } finally {
            this._segmentWriter.close();
        }
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        this._segmentWriter.collect(this._recordConverter.convertToRow(t));
        this._segmentNumRecord += serialVersionUID;
        if (this._segmentNumRecord >= this._segmentFlushMaxNumRecords) {
            flush();
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        throw new UnsupportedOperationException("snapshotState is invoked in Pinot sink");
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    private void flush() throws Exception {
        URI flush = this._segmentWriter.flush();
        LOG.info("Pinot segment writer flushed with {} records to {}", Long.valueOf(this._segmentNumRecord), flush);
        this._segmentNumRecord = 0L;
        this._executor.submit(() -> {
            try {
                this._segmentUploader.uploadSegment(flush, (AuthProvider) null);
                LOG.info("Pinot segment uploaded to {}", flush);
            } catch (Exception e) {
                throw new RuntimeException("Failed to upload pinot segment", e);
            }
        });
    }
}
