package org.apache.pinot.tools.admin.command;

import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.tools.Command;
import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.StreamSourceType;
import picocli.CommandLine;

@CommandLine.Command(name = "StreamGitHubEvents")
/* loaded from: input_file:org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.class */
public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand implements Command {
    private static final String PULL_REQUEST_MERGED_EVENT_TYPE = "pullRequestMergedEvent";

    @CommandLine.Option(names = {"-personalAccessToken"}, required = true, description = {"GitHub personal access token."})
    private String _personalAccessToken;

    @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka", description = {"Stream DataSource to use for ingesting data. Supported values - Kafka,Kinesis"})
    private String _sourceType;

    @CommandLine.Option(names = {"-awsAccessKey"}, description = {"AccessKey for AWS Account."})
    private String _accessKey;

    @CommandLine.Option(names = {"-awsSecretKey"}, description = {"SecretKey for AWS Account"})
    private String _secretKey;

    @CommandLine.Option(names = {"-topic"}, required = true, description = {"Name of kafka-topic/kinesis-stream to publish events."})
    private String _topic;

    @CommandLine.Option(names = {"-schemaFile"}, description = {"Path to schema file. By default uses examples/stream/githubEvents/pullRequestMergedEvents_schema.json"})
    private String _schemaFile;

    @CommandLine.Option(names = {"-kafkaBrokerList"}, description = {"Kafka broker list of the kafka cluster to produce events."})
    private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;

    @CommandLine.Option(names = {"-kinesisEndpoint"}, description = {"Endpoint of localstack or any other Kinesis cluster when not using AWS."})
    private String _kinesisEndpoint = null;

    @CommandLine.Option(names = {"-awsRegion"}, description = {"AWS Region in which Kinesis is located"})
    private String _awsRegion = "us-east-1";

    @CommandLine.Option(names = {"-eventType"}, description = {"Type of GitHub event. Supported types - pullRequestMergedEvent"})
    private String _eventType = PULL_REQUEST_MERGED_EVENT_TYPE;

    @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true, description = {"Print this message."})
    private boolean _help = false;

    public void setPersonalAccessToken(String str) {
        this._personalAccessToken = str;
    }

    public void setKafkaBrokerList(String str) {
        this._kafkaBrokerList = str;
    }

    public void setTopic(String str) {
        this._topic = str;
    }

    public void setEventType(String str) {
        this._eventType = str;
    }

    public void setSchemaFile(String str) {
        this._schemaFile = str;
    }

    @Override // org.apache.pinot.tools.Command
    public boolean getHelp() {
        return this._help;
    }

    @Override // org.apache.pinot.tools.AbstractBaseCommand
    public String getName() {
        return "StreamGitHubEvents";
    }

    public String toString() {
        return "StreamGitHubEvents -personalAccessToken " + this._personalAccessToken + " -kafkaBrokerList " + this._kafkaBrokerList + " -topic " + this._topic + " eventType " + this._eventType + " schemaFile " + this._schemaFile;
    }

    @Override // org.apache.pinot.tools.AbstractBaseCommand
    public void cleanup() {
    }

    @Override // org.apache.pinot.tools.Command
    public String description() {
        return "Streams GitHubEvents into a Kafka topic or Kinesis Stream";
    }

    @Override // org.apache.pinot.tools.Command
    public boolean execute() throws Exception {
        StreamDataProducer kafkaStreamDataProducer;
        PluginManager.get().init();
        if (!PULL_REQUEST_MERGED_EVENT_TYPE.equals(this._eventType)) {
            throw new UnsupportedOperationException("Event type " + this._eventType + " is unsupported");
        }
        switch (StreamSourceType.valueOf(this._sourceType.toUpperCase())) {
            case KINESIS:
                kafkaStreamDataProducer = PullRequestMergedEventsStream.getKinesisStreamDataProducer(this._kinesisEndpoint, this._awsRegion, this._accessKey, this._secretKey);
                break;
            case KAFKA:
            default:
                kafkaStreamDataProducer = PullRequestMergedEventsStream.getKafkaStreamDataProducer(this._kafkaBrokerList);
                break;
        }
        new PullRequestMergedEventsStream(this._schemaFile, this._topic, this._personalAccessToken, kafkaStreamDataProducer).execute();
        return true;
    }
}
