package org.apache.pinot.tools;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
import org.apache.pinot.tools.utils.KinesisStarterUtils;
import org.apache.pinot.tools.utils.StreamSourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/GitHubEventsQuickstart.class */
public class GitHubEventsQuickstart extends QuickStartBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(GitHubEventsQuickstart.class);
    private StreamDataServerStartable _serverStarter;
    private String _personalAccessToken;
    private StreamSourceType _sourceType;

    private void startKinesis() {
        try {
            Properties properties = new Properties();
            properties.put("port", 4566);
            this._serverStarter = StreamDataProvider.getServerDataStartable(KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, properties);
            this._serverStarter.start();
            Properties properties2 = new Properties();
            properties2.put(KinesisStarterUtils.NUM_SHARDS, 3);
            this._serverStarter.createTopic("pullRequestMergedEvents", properties2);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    printStatus(Quickstart.Color.GREEN, "***** Shutting down Kinesis *****");
                    this._serverStarter.stop();
                } catch (Exception e) {
                    LOGGER.error("Caught exception in shutting down Kinesis", e);
                }
            }));
        } catch (Exception e) {
            throw new RuntimeException("Failed to start " + KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, e);
        }
    }

    private void startStreamServer() {
        switch (this._sourceType) {
            case KINESIS:
                startKinesis();
                return;
            case KAFKA:
            default:
                startKafka();
                return;
        }
    }

    private void execute(String str, StreamSourceType streamSourceType) throws Exception {
        File file = new File(new File("githubEvents-" + System.currentTimeMillis()), "pullRequestMergedEvents");
        if (!file.exists()) {
            Preconditions.checkState(file.mkdirs());
        }
        File file2 = new File(file, "pullRequestMergedEvents_schema.json");
        File file3 = new File(file, "pullRequestMergedEvents_realtime_table_config.json");
        ClassLoader classLoader = Quickstart.class.getClassLoader();
        URL resource = classLoader.getResource("examples/stream/pullRequestMergedEvents/pullRequestMergedEvents_schema.json");
        Preconditions.checkNotNull(resource);
        FileUtils.copyURLToFile(resource, file2);
        URL resource2 = classLoader.getResource(getTableConfigFilePath());
        Preconditions.checkNotNull(resource2);
        FileUtils.copyURLToFile(resource2, file3);
        File file4 = new File(FileUtils.getTempDirectory(), String.valueOf(System.currentTimeMillis()));
        Preconditions.checkState(file4.mkdirs());
        QuickstartRunner quickstartRunner = new QuickstartRunner(Lists.newArrayList(new QuickstartTableRequest[]{new QuickstartTableRequest(file.getAbsolutePath())}), 1, 1, 1, 1, file4, getConfigOverrides());
        printStatus(Quickstart.Color.CYAN, String.format("***** Starting %s *****", streamSourceType));
        startStreamServer();
        printStatus(Quickstart.Color.CYAN, "***** Starting zookeeper, controller, server and broker *****");
        quickstartRunner.startAll();
        printStatus(Quickstart.Color.CYAN, "***** Adding pullRequestMergedEvents table *****");
        quickstartRunner.bootstrapTable();
        printStatus(Quickstart.Color.CYAN, String.format("***** Starting pullRequestMergedEvents data stream and publishing to %s *****", this._sourceType));
        new PullRequestMergedEventsStream(file2.getAbsolutePath(), "pullRequestMergedEvents", str, PullRequestMergedEventsStream.getStreamDataProducer(this._sourceType)).execute();
        printStatus(Quickstart.Color.CYAN, "***** Waiting for 10 seconds for a few events to get populated *****");
        Thread.sleep(10000L);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                printStatus(Quickstart.Color.GREEN, "***** Shutting down GitHubEventsQuickStart *****");
                quickstartRunner.stop();
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                LOGGER.error("Caught exception in shutting down GitHubEvents QuickStart", e);
            }
        }));
        printStatus(Quickstart.Color.YELLOW, "***** Realtime github demo quickstart setup complete *****");
        printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
        printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from pullRequestMergedEvents limit 0");
        printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(quickstartRunner.runQuery("select count(*) from pullRequestMergedEvents limit 0")));
        printStatus(Quickstart.Color.GREEN, "***************************************************");
        printStatus(Quickstart.Color.YELLOW, "Top 10 repo with the most lines added");
        printStatus(Quickstart.Color.CYAN, "Query : " + "select sum(numLinesAdded) from pullRequestMergedEvents group by repo top 10 limit 0");
        printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(quickstartRunner.runQuery("select sum(numLinesAdded) from pullRequestMergedEvents group by repo top 10 limit 0")));
        printStatus(Quickstart.Color.GREEN, "***************************************************");
        printStatus(Quickstart.Color.YELLOW, "Show data for COLLABORATORS");
        printStatus(Quickstart.Color.CYAN, "Query : " + "select * from pullRequestMergedEvents where authorAssociation = 'COLLABORATOR' limit 10");
        printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(quickstartRunner.runQuery("select * from pullRequestMergedEvents where authorAssociation = 'COLLABORATOR' limit 10")));
        printStatus(Quickstart.Color.GREEN, "***************************************************");
        printStatus(Quickstart.Color.YELLOW, "Show repos with longest alive pull requests");
        printStatus(Quickstart.Color.CYAN, "Query : " + "select max(elapsedTimeMillis) from pullRequestMergedEvents group by repo top 10 limit 0");
        printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(quickstartRunner.runQuery("select max(elapsedTimeMillis) from pullRequestMergedEvents group by repo top 10 limit 0")));
        printStatus(Quickstart.Color.GREEN, "***************************************************");
        printStatus(Quickstart.Color.YELLOW, "Total number of documents in the table");
        printStatus(Quickstart.Color.CYAN, "Query : " + "select count(*) from pullRequestMergedEvents");
        printStatus(Quickstart.Color.YELLOW, prettyPrintResponse(quickstartRunner.runQuery("select count(*) from pullRequestMergedEvents")));
        printStatus(Quickstart.Color.GREEN, "***************************************************");
        printStatus(Quickstart.Color.GREEN, "You can always go to http://localhost:9000 to play around in the query console");
    }

    private String getTableConfigFilePath() {
        String str;
        switch (this._sourceType) {
            case KINESIS:
                str = "examples/stream/pullRequestMergedEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json";
                break;
            case KAFKA:
            default:
                str = "examples/stream/pullRequestMergedEvents/pullRequestMergedEvents_realtime_table_config.json";
                break;
        }
        return str;
    }

    @Override // org.apache.pinot.tools.QuickStartBase
    public List<String> types() {
        return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS");
    }

    @Override // org.apache.pinot.tools.QuickStartBase
    public void execute() throws Exception {
        execute(this._personalAccessToken, this._sourceType);
    }

    public GitHubEventsQuickstart setPersonalAccessToken(String str) {
        this._personalAccessToken = str;
        return this;
    }

    public GitHubEventsQuickstart setSourceType(String str) {
        this._sourceType = StreamSourceType.valueOf(str.toUpperCase());
        return this;
    }
}
