package org.apache.pinot.tools.streams;

import java.io.File;
import java.io.IOException;
import java.util.Properties;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.QuickStartBase;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;

/* loaded from: input_file:org/apache/pinot/tools/streams/AirlineDataStream.class */
public class AirlineDataStream {
    private static final String KAFKA_TOPIC_NAME = "flights-realtime";
    Schema _pinotSchema;
    String _timeColumnName;
    File _avroFile;
    final Integer _startTime;
    private StreamDataProducer _producer;
    private PinotRealtimeSource _pinotStream;

    public AirlineDataStream(Schema schema, TableConfig tableConfig, File file, StreamDataProducer streamDataProducer) throws IOException {
        this._startTime = 16102;
        this._pinotSchema = schema;
        this._timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        this._avroFile = file;
        this._producer = streamDataProducer;
        this._pinotStream = PinotRealtimeSource.builder().setProducer(this._producer).setGenerator(new AvroFileSourceGenerator(schema, file, 1, this._timeColumnName, l -> {
            return Long.valueOf(this._startTime.intValue() + (l.longValue() / 60));
        })).setTopic(KAFKA_TOPIC_NAME).setMaxMessagePerSecond(1L).build();
        QuickStartBase.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 60 events (which is approximately 60 seconds) *****");
    }

    public AirlineDataStream(File file) throws Exception {
        this(Schema.fromFile(new File(file, "airlineStats_schema.json")), (TableConfig) JsonUtils.fileToObject(new File(file, "airlineStats_realtime_table_config.json"), TableConfig.class), new File(file, "rawdata/airlineStats_data.avro"), getDefaultKafkaProducer());
    }

    public static StreamDataProducer getDefaultKafkaProducer() throws Exception {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
    }

    public void run() {
        this._pinotStream.run();
    }

    public void shutdown() throws Exception {
        this._pinotStream.close();
        this._producer.close();
        this._producer = null;
    }
}
