package org.apache.pinot.plugin.stream.kinesis;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.class */
public class KinesisStreamMetadataProviderTest {
    private static final String STREAM_NAME = "kinesis-test";
    private static final String AWS_REGION = "us-west-2";
    private static final String SHARD_ID_0 = "0";
    private static final String SHARD_ID_1 = "1";
    private static final String CLIENT_ID = "dummy";
    private static final int TIMEOUT = 1000;
    private KinesisConnectionHandler _kinesisConnectionHandler;
    private KinesisStreamMetadataProvider _kinesisStreamMetadataProvider;
    private StreamConsumerFactory _streamConsumerFactory;
    private PartitionGroupConsumer _partitionGroupConsumer;

    private StreamConfig getStreamConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("region", AWS_REGION);
        hashMap.put("maxRecordsToFetch", "10");
        hashMap.put("shardIteratorType", ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
        hashMap.put("streamType", "kinesis");
        hashMap.put("stream.kinesis.topic.name", STREAM_NAME);
        hashMap.put("stream.kinesis.decoder.class.name", "ABCD");
        hashMap.put("stream.kinesis.consumer.factory.class.name", "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory");
        return new StreamConfig("", hashMap);
    }

    @BeforeMethod
    public void setupTest() {
        this._kinesisConnectionHandler = (KinesisConnectionHandler) Mockito.mock(KinesisConnectionHandler.class);
        this._streamConsumerFactory = (StreamConsumerFactory) Mockito.mock(StreamConsumerFactory.class);
        this._partitionGroupConsumer = (PartitionGroupConsumer) Mockito.mock(PartitionGroupConsumer.class);
        this._kinesisStreamMetadataProvider = new KinesisStreamMetadataProvider(CLIENT_ID, getStreamConfig(), this._kinesisConnectionHandler, this._streamConsumerFactory);
    }

    @Test
    public void getPartitionsGroupInfoListTest() throws Exception {
        Mockito.when(this._kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of((Shard) Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build(), (Shard) Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build()));
        List computePartitionGroupMetadata = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), new ArrayList(), TIMEOUT);
        Assert.assertEquals(computePartitionGroupMetadata.size(), 2);
        Assert.assertEquals(((PartitionGroupMetadata) computePartitionGroupMetadata.get(0)).getPartitionGroupId(), 0);
        Assert.assertEquals(((PartitionGroupMetadata) computePartitionGroupMetadata.get(1)).getPartitionGroupId(), 1);
    }

    @Test
    public void fetchStreamPartitionOffsetTest() {
        Mockito.when(this._kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of((Shard) Shard.builder().shardId("shardId-0").sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).endingSequenceNumber("100").build()).build(), (Shard) Shard.builder().shardId("shardId-1").sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber("2").endingSequenceNumber("200").build()).build()));
        KinesisStreamMetadataProvider kinesisStreamMetadataProvider = new KinesisStreamMetadataProvider(CLIENT_ID, getStreamConfig(), SHARD_ID_0, this._kinesisConnectionHandler, this._streamConsumerFactory);
        Assert.assertEquals(kinesisStreamMetadataProvider.fetchPartitionCount(1000L), 2);
        KinesisPartitionGroupOffset fetchStreamPartitionOffset = kinesisStreamMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 1000L);
        Assert.assertEquals(fetchStreamPartitionOffset.getShardId(), "shardId-0");
        Assert.assertEquals(fetchStreamPartitionOffset.getSequenceNumber(), SHARD_ID_1);
        KinesisPartitionGroupOffset fetchStreamPartitionOffset2 = kinesisStreamMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 1000L);
        Assert.assertEquals(fetchStreamPartitionOffset2.getShardId(), "shardId-0");
        Assert.assertEquals(fetchStreamPartitionOffset2.getSequenceNumber(), "100");
        KinesisStreamMetadataProvider kinesisStreamMetadataProvider2 = new KinesisStreamMetadataProvider(CLIENT_ID, getStreamConfig(), SHARD_ID_1, this._kinesisConnectionHandler, this._streamConsumerFactory);
        Assert.assertEquals(kinesisStreamMetadataProvider2.fetchPartitionCount(1000L), 2);
        KinesisPartitionGroupOffset fetchStreamPartitionOffset3 = kinesisStreamMetadataProvider2.fetchStreamPartitionOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 1000L);
        Assert.assertEquals(fetchStreamPartitionOffset3.getShardId(), "shardId-1");
        Assert.assertEquals(fetchStreamPartitionOffset3.getSequenceNumber(), "2");
        KinesisPartitionGroupOffset fetchStreamPartitionOffset4 = kinesisStreamMetadataProvider2.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, 1000L);
        Assert.assertEquals(fetchStreamPartitionOffset4.getShardId(), "shardId-1");
        Assert.assertEquals(fetchStreamPartitionOffset4.getSequenceNumber(), "200");
    }

    @Test
    public void getPartitionsGroupInfoEndOfShardTest() throws Exception {
        ArrayList arrayList = new ArrayList();
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(SHARD_ID_0, SHARD_ID_1);
        arrayList.add(new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset, kinesisPartitionGroupOffset, "CONSUMING"));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(StreamPartitionMsgOffset.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(PartitionGroupConsumptionStatus.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(String.class);
        Mockito.when(this._kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of((Shard) Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).endingSequenceNumber(SHARD_ID_1).build()).build(), (Shard) Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build()));
        Mockito.when(this._streamConsumerFactory.createPartitionGroupConsumer((String) forClass4.capture(), (PartitionGroupConsumptionStatus) forClass2.capture())).thenReturn(this._partitionGroupConsumer);
        Mockito.when(this._partitionGroupConsumer.fetchMessages((StreamPartitionMsgOffset) forClass.capture(), ((Integer) forClass3.capture()).intValue())).thenReturn(new KinesisMessageBatch(new ArrayList(), kinesisPartitionGroupOffset, true));
        List computePartitionGroupMetadata = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), arrayList, TIMEOUT);
        Assert.assertEquals(computePartitionGroupMetadata.size(), 1);
        Assert.assertEquals(((PartitionGroupMetadata) computePartitionGroupMetadata.get(0)).getPartitionGroupId(), 1);
        Assert.assertEquals(((PartitionGroupConsumptionStatus) forClass2.getValue()).getSequenceNumber(), 1);
    }

    @Test
    public void getPartitionsGroupInfoChildShardsest() throws Exception {
        ArrayList arrayList = new ArrayList();
        new HashMap().put(SHARD_ID_1, SHARD_ID_1);
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(SHARD_ID_1, SHARD_ID_1);
        arrayList.add(new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset, kinesisPartitionGroupOffset, "CONSUMING"));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(StreamPartitionMsgOffset.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(PartitionGroupConsumptionStatus.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(String.class);
        Mockito.when(this._kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of((Shard) Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).build()).build(), (Shard) Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange((SequenceNumberRange) SequenceNumberRange.builder().startingSequenceNumber(SHARD_ID_1).endingSequenceNumber(SHARD_ID_1).build()).build()));
        Mockito.when(this._streamConsumerFactory.createPartitionGroupConsumer((String) forClass4.capture(), (PartitionGroupConsumptionStatus) forClass2.capture())).thenReturn(this._partitionGroupConsumer);
        Mockito.when(this._partitionGroupConsumer.fetchMessages((StreamPartitionMsgOffset) forClass.capture(), ((Integer) forClass3.capture()).intValue())).thenReturn(new KinesisMessageBatch(new ArrayList(), kinesisPartitionGroupOffset, true));
        List computePartitionGroupMetadata = this._kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), arrayList, TIMEOUT);
        Assert.assertEquals(computePartitionGroupMetadata.size(), 1);
        Assert.assertEquals(((PartitionGroupMetadata) computePartitionGroupMetadata.get(0)).getPartitionGroupId(), 0);
        Assert.assertEquals(((PartitionGroupConsumptionStatus) forClass2.getValue()).getSequenceNumber(), 1);
    }
}
