package org.apache.pinot.plugin.stream.kinesis;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.apache.pinot.spi.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandlerIntegrationTest.class */
public class KinesisConnectionHandlerIntegrationTest {
    private static final String TEST_STREAM_PREFIX = "pinot-test-stream-";
    private KinesisConnectionHandler _connectionHandler;
    private String _testStreamName;
    private KinesisClient _kinesisClient;
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisConnectionHandlerIntegrationTest.class);
    private static final String AWS_REGION = System.getenv("AWS_REGION");
    private static final String AWS_ACCESS_KEY = System.getenv("AWS_ACCESS_KEY_ID");
    private static final String AWS_SECRET_KEY = System.getenv("AWS_SECRET_ACCESS_KEY");

    @BeforeClass
    public void setUp() {
        if (AWS_ACCESS_KEY == null || AWS_SECRET_KEY == null || AWS_REGION == null) {
            throw new SkipException("Skipping integration test because AWS credentials or region are not set in environment variables");
        }
        this._testStreamName = "pinot-test-stream-" + String.valueOf(UUID.randomUUID());
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kinesis");
        hashMap.put("region", AWS_REGION);
        hashMap.put("accessKey", AWS_ACCESS_KEY);
        hashMap.put("secretKey", AWS_SECRET_KEY);
        hashMap.put("stream.kinesis.topic.name", this._testStreamName);
        hashMap.put("stream.kinesis.decoder.class.name", "org.apache.pinot.plugin.stream.kinesis.KinesisMessageDecoder");
        this._connectionHandler = new KinesisConnectionHandler(new KinesisConfig(new StreamConfig("testTable", hashMap)));
        this._kinesisClient = this._connectionHandler.getClient();
        createTestStream();
    }

    private void createTestStream() {
        try {
            this._kinesisClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(this._testStreamName).shardCount(1).build());
            LOGGER.info("Waiting for stream {} to become active", this._testStreamName);
            this._kinesisClient.waiter().waitUntilStreamExists(builder -> {
                builder.streamName(this._testStreamName);
            });
            LOGGER.info("Stream {} is now active", this._testStreamName);
        } catch (Exception e) {
            LOGGER.error("Failed to create test stream", e);
            throw new RuntimeException(e);
        }
    }

    @AfterClass
    public void cleanup() {
        try {
            if (this._kinesisClient != null && this._testStreamName != null) {
                this._kinesisClient.deleteStream((DeleteStreamRequest) DeleteStreamRequest.builder().streamName(this._testStreamName).build());
                LOGGER.info("Deleted test stream: {}", this._testStreamName);
            }
        } catch (Exception e) {
            LOGGER.error("Failed to delete test stream", e);
        }
    }

    @Test
    public void testGetStreamNames() {
        List streamNames = this._connectionHandler.getStreamNames();
        Assert.assertNotNull(streamNames);
        Assert.assertTrue(streamNames.contains(this._testStreamName), "Expected to find test stream " + this._testStreamName + " in list of streams: " + String.valueOf(streamNames));
    }

    @Test(dependsOnMethods = {"testGetStreamNames"})
    public void testGetStreamNamesWithPagination() {
        String[] strArr = new String[3];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = "pinot-test-stream-" + String.valueOf(UUID.randomUUID());
            this._kinesisClient.createStream((CreateStreamRequest) CreateStreamRequest.builder().streamName(strArr[i]).shardCount(1).build());
        }
        for (String str : strArr) {
            this._kinesisClient.waiter().waitUntilStreamExists(builder -> {
                builder.streamName(str);
            });
        }
        try {
            List streamNames = this._connectionHandler.getStreamNames();
            Assert.assertNotNull(streamNames);
            Assert.assertTrue(streamNames.contains(this._testStreamName));
            for (String str2 : strArr) {
                Assert.assertTrue(streamNames.contains(str2), "Expected to find stream: " + str2);
            }
        } finally {
            for (String str3 : strArr) {
                this._kinesisClient.deleteStream((DeleteStreamRequest) DeleteStreamRequest.builder().streamName(str3).build());
            }
        }
    }
}
