package org.apache.pinot.tools.streams.githubevents;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URL;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.streams.PinotRealtimeSource;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.KinesisStarterUtils;
import org.apache.pinot.tools.utils.StreamSourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.class */
public class PullRequestMergedEventsStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(PullRequestMergedEventsStream.class);
    private static final String PULSAR_DATA_PRODUCER_CLASS_NAME = "org.apache.pinot.plugin.stream.pulsar.server.PulsarDataProducer";
    private PinotRealtimeSource _pinotStream;

    public PullRequestMergedEventsStream(File file, String str, String str2, StreamDataProducer streamDataProducer) throws Exception {
        this._pinotStream = PinotRealtimeSource.builder().setProducer(streamDataProducer).setGenerator(new GithubPullRequestSourceGenerator(file, str2)).setTopic(str).build();
    }

    public PullRequestMergedEventsStream(String str, String str2, String str3, StreamDataProducer streamDataProducer) throws Exception {
        this(getSchemaFile(str), str2, str3, streamDataProducer);
    }

    public static File getSchemaFile(String str) {
        File file;
        try {
            if (str == null) {
                URL resource = PullRequestMergedEventsStream.class.getClassLoader().getResource("examples/stream/pullRequestMergedEvents/pullRequestMergedEvents_schema.json");
                Preconditions.checkNotNull(resource);
                file = new File(resource.getFile());
            } else {
                file = new File(str);
            }
            return file;
        } catch (Exception e) {
            LOGGER.error("Got exception while reading Pinot schema from file: [{}]", str);
            throw e;
        }
    }

    public static StreamDataProducer getKafkaStreamDataProducer() throws Exception {
        return getKafkaStreamDataProducer(KafkaStarterUtils.DEFAULT_KAFKA_BROKER, null, null, null);
    }

    public static StreamDataProducer getKafkaStreamDataProducer(String str, String str2, String str3, String str4) throws Exception {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", str);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        if (StringUtils.isNotEmpty(str2)) {
            properties.put("security.protocol", str2);
            if (str2.equals("SASL_SSL") && StringUtils.isNotEmpty(str3) && StringUtils.isNotEmpty(str4)) {
                properties.put("sasl.mechanism", "PLAIN");
                properties.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \n username=\"%s\" \n password=\"%s\";", str3, str4));
            }
        }
        return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
    }

    public static StreamDataProducer getKinesisStreamDataProducer(String str, String str2, String str3, String str4) throws Exception {
        Properties properties = new Properties();
        if (StringUtils.isNotEmpty(str3) && StringUtils.isNotEmpty(str4)) {
            properties.put("access", str3);
            properties.put("secret", str4);
        }
        if (StringUtils.isNotEmpty(str)) {
            properties.put("endpoint", str);
        }
        properties.put("region", str2);
        return StreamDataProvider.getStreamDataProducer(KinesisStarterUtils.KINESIS_PRODUCER_CLASS_NAME, properties);
    }

    public static StreamDataProducer getPulsarStreamDataProducer(String str, String str2) throws Exception {
        Properties properties = new Properties();
        if (StringUtils.isNotEmpty(str)) {
            properties.put("brokerServiceUrl", str);
        }
        if (StringUtils.isNotEmpty(str2)) {
            properties.put("token", str2);
        }
        return StreamDataProvider.getStreamDataProducer(PULSAR_DATA_PRODUCER_CLASS_NAME, properties);
    }

    public static StreamDataProducer getKinesisStreamDataProducer() throws Exception {
        return getKinesisStreamDataProducer(KinesisStarterUtils.DEFAULT_KINESIS_ENDPOINT, "us-east-1", "access", "secret");
    }

    public static StreamDataProducer getStreamDataProducer(StreamSourceType streamSourceType) throws Exception {
        switch (streamSourceType) {
            case KAFKA:
                return getKafkaStreamDataProducer();
            case KINESIS:
                return getKinesisStreamDataProducer();
            default:
                throw new RuntimeException("Invalid streamSourceType specified: " + streamSourceType);
        }
    }

    public static void main(String[] strArr) throws Exception {
        new PullRequestMergedEventsStream(strArr[1], "pullRequestMergedEvent", strArr[0], getKafkaStreamDataProducer()).execute();
    }

    public void execute() {
        start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                shutdown();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }));
    }

    public void shutdown() throws Exception {
        Quickstart.printStatus(Quickstart.Color.GREEN, "***** Shutting down pullRequestMergedEvents Stream *****");
        this._pinotStream.close();
    }

    public void start() {
        Quickstart.printStatus(Quickstart.Color.CYAN, "***** Starting pullRequestMergedEvents Stream *****");
        this._pinotStream.run();
    }
}
