package org.apache.pinot.plugin.stream.pulsar;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.class */
public class PulsarConsumerTest {
    public static final String TABLE_NAME_WITH_TYPE = "tableName_REALTIME";
    public static final String TEST_TOPIC = "test-topic";
    public static final String TEST_TOPIC_BATCH = "test-topic-batch";
    public static final String MESSAGE_PREFIX = "sample_msg_";
    public static final String CLIENT_ID = "clientId";
    public static final int NUM_PARTITIONS = 2;
    public static final int NUM_RECORDS_PER_PARTITION = 1000;
    public static final int BATCH_SIZE = 10;
    private final List<List<MessageId>> _partitionToMessageIdMapping = new ArrayList(2);
    private final List<List<MessageId>> _partitionToMessageIdMappingBatch = new ArrayList(2);
    private PulsarContainer _pulsar;
    private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:3.2.2");
    public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(1);

    @BeforeClass
    public void setUp() throws Exception {
        this._pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5L));
        this._pulsar.start();
        PulsarAdmin build = PulsarAdmin.builder().serviceHttpUrl(this._pulsar.getHttpServiceUrl()).build();
        try {
            Topics topics = build.topics();
            topics.createPartitionedTopic(TEST_TOPIC, 2);
            topics.createPartitionedTopic(TEST_TOPIC_BATCH, 2);
            if (build != null) {
                build.close();
            }
            PulsarClient build2 = PulsarClient.builder().serviceUrl(this._pulsar.getPulsarBrokerUrl()).build();
            try {
                publishRecords(build2);
                publishRecordsBatch(build2);
                if (build2 != null) {
                    build2.close();
                }
            } catch (Throwable th) {
                if (build2 != null) {
                    try {
                        build2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @AfterClass
    public void tearDown() throws Exception {
        this._pulsar.stop();
    }

    public void publishRecords(PulsarClient pulsarClient) throws Exception {
        for (int i = 0; i < 2; i++) {
            ArrayList arrayList = new ArrayList(NUM_RECORDS_PER_PARTITION);
            this._partitionToMessageIdMapping.add(arrayList);
            final int i2 = i;
            Producer create = pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new MessageRouter() { // from class: org.apache.pinot.plugin.stream.pulsar.PulsarConsumerTest.1
                public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
                    return i2;
                }
            }).enableBatching(false).create();
            try {
                ArrayList arrayList2 = new ArrayList(NUM_RECORDS_PER_PARTITION);
                for (int i3 = 0; i3 < 1000; i3++) {
                    arrayList2.add(create.sendAsync("sample_msg_" + i3));
                }
                create.flush();
                for (int i4 = 0; i4 < 1000; i4++) {
                    MessageId messageId = (MessageId) ((Future) arrayList2.get(i4)).get();
                    Assert.assertFalse(messageId instanceof BatchMessageIdImpl);
                    arrayList.add(messageId);
                }
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public void publishRecordsBatch(PulsarClient pulsarClient) throws Exception {
        for (int i = 0; i < 2; i++) {
            ArrayList arrayList = new ArrayList(NUM_RECORDS_PER_PARTITION);
            this._partitionToMessageIdMappingBatch.add(arrayList);
            final int i2 = i;
            Producer create = pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH).messageRouter(new MessageRouter() { // from class: org.apache.pinot.plugin.stream.pulsar.PulsarConsumerTest.2
                public int choosePartition(Message<?> message, TopicMetadata topicMetadata) {
                    return i2;
                }
            }).batchingMaxMessages(10).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
            try {
                ArrayList arrayList2 = new ArrayList(NUM_RECORDS_PER_PARTITION);
                for (int i3 = 0; i3 < 1000; i3++) {
                    arrayList2.add(create.sendAsync("sample_msg_" + i3));
                }
                create.flush();
                for (int i4 = 0; i4 < 1000; i4++) {
                    MessageId messageId = (MessageId) ((Future) arrayList2.get(i4)).get();
                    Assert.assertTrue(messageId instanceof BatchMessageIdImpl);
                    arrayList.add(messageId);
                }
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public StreamConfig getStreamConfig(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", PulsarConfigTest.STREAM_TYPE);
        hashMap.put("stream.pulsar.topic.name", str);
        hashMap.put("stream.pulsar.bootstrap.servers", this._pulsar.getPulsarBrokerUrl());
        hashMap.put("stream.pulsar.serviceHttpUrl", this._pulsar.getHttpServiceUrl());
        hashMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
        hashMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName());
        hashMap.put("stream.pulsar.decoder.class.name", "dummy");
        return new StreamConfig("tableName_REALTIME", hashMap);
    }

    @Test
    public void testPartitionLevelConsumer() throws Exception {
        StreamConsumerFactory create = StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC));
        PulsarStreamMetadataProvider pulsarStreamMetadataProvider = new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC));
        try {
            Assert.assertEquals(pulsarStreamMetadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS), 2);
            pulsarStreamMetadataProvider.close();
            for (int i = 0; i < 2; i++) {
                List<MessageId> list = this._partitionToMessageIdMapping.get(i);
                PulsarPartitionLevelConsumer pulsarPartitionLevelConsumer = (PulsarPartitionLevelConsumer) create.createPartitionGroupConsumer(CLIENT_ID, new PartitionGroupConsumptionStatus(i, 0, new MessageIdStreamOffset(MessageId.earliest), (StreamPartitionMsgOffset) null, "CONSUMING"));
                try {
                    testConsumer(pulsarPartitionLevelConsumer, 0, list);
                    testConsumer(pulsarPartitionLevelConsumer, 500, list);
                    if (pulsarPartitionLevelConsumer != null) {
                        pulsarPartitionLevelConsumer.close();
                    }
                } catch (Throwable th) {
                    if (pulsarPartitionLevelConsumer != null) {
                        try {
                            pulsarPartitionLevelConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        } catch (Throwable th3) {
            try {
                pulsarStreamMetadataProvider.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test
    public void testPartitionLevelConsumerBatchMessages() throws Exception {
        StreamConsumerFactory create = StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH));
        PulsarStreamMetadataProvider pulsarStreamMetadataProvider = new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig(TEST_TOPIC_BATCH));
        try {
            Assert.assertEquals(pulsarStreamMetadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS), 2);
            pulsarStreamMetadataProvider.close();
            for (int i = 0; i < 2; i++) {
                List<MessageId> list = this._partitionToMessageIdMappingBatch.get(i);
                PulsarPartitionLevelConsumer pulsarPartitionLevelConsumer = (PulsarPartitionLevelConsumer) create.createPartitionGroupConsumer(CLIENT_ID, new PartitionGroupConsumptionStatus(i, 0, new MessageIdStreamOffset(MessageId.earliest), (StreamPartitionMsgOffset) null, "CONSUMING"));
                try {
                    testConsumer(pulsarPartitionLevelConsumer, 0, list);
                    testConsumer(pulsarPartitionLevelConsumer, 500, list);
                    if (pulsarPartitionLevelConsumer != null) {
                        pulsarPartitionLevelConsumer.close();
                    }
                } catch (Throwable th) {
                    if (pulsarPartitionLevelConsumer != null) {
                        try {
                            pulsarPartitionLevelConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        } catch (Throwable th3) {
            try {
                pulsarStreamMetadataProvider.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test
    public void testGetTopics() throws Exception {
        PulsarStreamMetadataProvider pulsarStreamMetadataProvider = new PulsarStreamMetadataProvider(CLIENT_ID, getStreamConfig("NON_EXISTING_TOPIC"));
        try {
            Assert.assertTrue(((List) pulsarStreamMetadataProvider.getTopics().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList())).size() == 4);
            pulsarStreamMetadataProvider.close();
        } catch (Throwable th) {
            try {
                pulsarStreamMetadataProvider.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void testConsumer(PulsarPartitionLevelConsumer pulsarPartitionLevelConsumer, int i, List<MessageId> list) {
        MessageId messageId = i == 0 ? MessageId.earliest : list.get(i);
        int i2 = i;
        while (i2 < 1000) {
            PulsarMessageBatch fetchMessages = pulsarPartitionLevelConsumer.fetchMessages(new MessageIdStreamOffset(messageId), CONSUMER_FETCH_TIMEOUT_MILLIS);
            int messageCount = fetchMessages.getMessageCount();
            Assert.assertFalse(fetchMessages.isEndOfPartitionGroup());
            for (int i3 = 0; i3 < messageCount; i3++) {
                verifyMessage(fetchMessages.getStreamMessage(i3), i2 + i3, list);
            }
            i2 += messageCount;
            if (i2 < 1000) {
                messageId = list.get(i2);
            }
        }
        Assert.assertEquals(i2, NUM_RECORDS_PER_PARTITION);
    }

    private void verifyMessage(BytesStreamMessage bytesStreamMessage, int i, List<MessageId> list) {
        Assert.assertEquals(new String((byte[]) bytesStreamMessage.getValue()), "sample_msg_" + i);
        StreamMessageMetadata metadata = bytesStreamMessage.getMetadata();
        Assert.assertNotNull(metadata);
        MessageIdStreamOffset offset = metadata.getOffset();
        Assert.assertNotNull(offset);
        MessageIdStreamOffset nextOffset = metadata.getNextOffset();
        Assert.assertNotNull(nextOffset);
        Assert.assertEquals(offset.getMessageId(), list.get(i));
        if (i < 999) {
            Assert.assertEquals(nextOffset.getMessageId(), list.get(i + 1));
        }
    }
}
