package org.apache.pinot.plugin.stream.kinesis.server;

import cloud.localstack.Localstack;
import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
import com.google.common.base.Function;
import java.net.URI;
import java.util.HashMap;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.pinot.plugin.stream.kinesis.KinesisConfig;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.utils.AttributeMap;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.class */
public class KinesisDataServerStartable implements StreamDataServerStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisDataServerStartable.class);
    public static final String NUM_SHARDS_PROPERTY = "numShards";
    public static final String DEFAULT_REGION = "us-east-1";
    public static final String DEFAULT_ACCESS_KEY = "access";
    public static final String DEFAULT_SECRET_KEY = "secret";
    public static final String DEFAULT_PORT = "4566";
    LocalstackDockerConfiguration _dockerConfig;
    Properties _serverProperties;
    private final Localstack _localstackDocker = Localstack.INSTANCE;
    private String _localStackKinesisEndpoint = "http://localhost:%s";

    public void init(Properties properties) {
        this._serverProperties = properties;
        HashMap hashMap = new HashMap();
        hashMap.put("SERVICES", KinesisConfig.STREAM_TYPE);
        this._dockerConfig = LocalstackDockerConfiguration.builder().portEdge(this._serverProperties.getProperty("port", DEFAULT_PORT)).portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15").environmentVariables(hashMap).build();
        this._localStackKinesisEndpoint = String.format(this._localStackKinesisEndpoint, this._serverProperties.getProperty("port", DEFAULT_PORT));
    }

    public void start() {
        this._localstackDocker.startup(this._dockerConfig);
    }

    public void stop() {
        this._localstackDocker.stop();
    }

    public void createTopic(final String str, Properties properties) {
        try {
            final KinesisClient kinesisClient = (KinesisClient) KinesisClient.builder().httpClient(new ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults(AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, Boolean.TRUE).build())).credentialsProvider(getLocalAWSCredentials()).region(Region.of(DEFAULT_REGION)).endpointOverride(new URI(this._localStackKinesisEndpoint)).build();
            kinesisClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(str).shardCount((Integer) properties.get(NUM_SHARDS_PROPERTY)).build());
            waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.plugin.stream.kinesis.server.KinesisDataServerStartable.1
                @Nullable
                public Boolean apply(@Nullable Void r5) {
                    try {
                        return Boolean.valueOf(kinesisClient.describeStream((DescribeStreamRequest) DescribeStreamRequest.builder().streamName(str).build()).streamDescription().streamStatusAsString().contentEquals("ACTIVE"));
                    } catch (Exception e) {
                        KinesisDataServerStartable.LOGGER.warn("Could not fetch kinesis stream status", e);
                        return null;
                    }
                }
            }, 1000L, 30000L, "Kinesis stream " + str + " is not created or is not in active state");
            LOGGER.info("Kinesis stream created successfully: " + str);
        } catch (Exception e) {
            LOGGER.warn("Error occurred while creating topic: " + str, e);
        }
    }

    public int getPort() {
        return this._localstackDocker.getEdgePort();
    }

    private AwsCredentialsProvider getLocalAWSCredentials() {
        return StaticCredentialsProvider.create(AwsBasicCredentials.create("access", "secret"));
    }

    private static void waitForCondition(Function<Void, Boolean> function, long j, long j2, @Nullable String str) {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        String str2 = str != null ? ", error message: " + str : "";
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
            } catch (Exception e) {
                LOGGER.error("Caught exception while checking the condition" + str2, e);
            }
            if (Boolean.TRUE.equals(function.apply((Object) null))) {
                return;
            } else {
                Thread.sleep(j);
            }
        }
        Logger logger = LOGGER;
        logger.error("Failed to meet condition in " + j2 + "ms" + logger);
    }
}
