package org.apache.pinot.plugin.stream.kafka30;

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pinot.plugin.stream.kafka30.utils.MiniKafkaCluster;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.class */
public class KafkaPartitionLevelConsumerTest {
    private static final long STABILIZE_SLEEP_DELAYS = 3000;
    private static final String TEST_TOPIC_1 = "foo";
    private static final String TEST_TOPIC_2 = "bar";
    private static final String TEST_TOPIC_3 = "expired";
    private static final int NUM_MSG_PRODUCED_PER_PARTITION = 1000;
    private static final long TIMESTAMP = Instant.now().toEpochMilli();
    private MiniKafkaCluster _kafkaCluster;
    private String _kafkaBrokerAddress;

    @BeforeClass
    public void setUp() throws Exception {
        this._kafkaCluster = new MiniKafkaCluster("0");
        this._kafkaCluster.start();
        this._kafkaBrokerAddress = this._kafkaCluster.getKafkaServerAddress();
        this._kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
        this._kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
        this._kafkaCluster.createTopic(TEST_TOPIC_3, 1, 1);
        Thread.sleep(STABILIZE_SLEEP_DELAYS);
        produceMsgToKafka();
        Thread.sleep(STABILIZE_SLEEP_DELAYS);
        this._kafkaCluster.deleteRecordsBeforeOffset(TEST_TOPIC_3, 0, 200L);
    }

    private void produceMsgToKafka() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this._kafkaBrokerAddress);
        properties.put("client.id", "clientId");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < NUM_MSG_PRODUCED_PER_PARTITION; i++) {
            try {
                kafkaProducer.send(new ProducerRecord(TEST_TOPIC_1, 0, Long.valueOf(TIMESTAMP + i), (Object) null, "sample_msg_" + i));
                kafkaProducer.send(new ProducerRecord(TEST_TOPIC_2, 0, Long.valueOf(TIMESTAMP + i), (Object) null, "sample_msg_" + i));
                kafkaProducer.send(new ProducerRecord(TEST_TOPIC_2, 1, Long.valueOf(TIMESTAMP + i), (Object) null, "sample_msg_" + i));
                kafkaProducer.send(new ProducerRecord(TEST_TOPIC_3, "sample_msg_" + i));
            } catch (Throwable th) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    @AfterClass
    public void tearDown() throws Exception {
        try {
            this._kafkaCluster.deleteTopic(TEST_TOPIC_1);
            this._kafkaCluster.deleteTopic(TEST_TOPIC_2);
            this._kafkaCluster.deleteTopic(TEST_TOPIC_3);
        } finally {
            this._kafkaCluster.close();
        }
    }

    @Test
    public void testBuildConsumer() {
        String str = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        hashMap.put("stream.kafka.fetcher.size", "10000");
        hashMap.put("stream.kafka.fetcher.minBytes", "20000");
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer = new KafkaPartitionLevelConsumer("clientId", new StreamConfig("tableName_REALTIME", hashMap), 0);
        kafkaPartitionLevelConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
        Assert.assertEquals(512000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
        Assert.assertEquals(10000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
        Assert.assertEquals(10000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
        Assert.assertEquals(20000, kafkaPartitionLevelConsumer.getKafkaPartitionLevelStreamConfig().getKafkaFetcherMinBytes());
        hashMap.put("stream.kafka.buffer.size", "100");
        hashMap.put("stream.kafka.socket.timeout", "1000");
        KafkaPartitionLevelConsumer kafkaPartitionLevelConsumer2 = new KafkaPartitionLevelConsumer("clientId", new StreamConfig("tableName_REALTIME", hashMap), 0);
        kafkaPartitionLevelConsumer2.fetchMessages(new LongMsgOffset(12345L), 10000);
        Assert.assertEquals(100, kafkaPartitionLevelConsumer2.getKafkaPartitionLevelStreamConfig().getKafkaBufferSize());
        Assert.assertEquals(NUM_MSG_PRODUCED_PER_PARTITION, kafkaPartitionLevelConsumer2.getKafkaPartitionLevelStreamConfig().getKafkaSocketTimeout());
    }

    @Test
    public void testGetPartitionCount() {
        String str = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertEquals(new KafkaStreamMetadataProvider("clientId", new StreamConfig("tableName_REALTIME", hashMap)).fetchPartitionCount(10000L), 1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("streamType", "kafka");
        hashMap2.put("stream.kafka.topic.name", TEST_TOPIC_2);
        hashMap2.put("stream.kafka.broker.list", str);
        hashMap2.put("stream.kafka.consumer.type", "simple");
        hashMap2.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap2.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertEquals(new KafkaStreamMetadataProvider("clientId", new StreamConfig("tableName_REALTIME", hashMap2)).fetchPartitionCount(10000L), 2);
    }

    @Test
    public void testFetchMessages() {
        String str = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "theTopic");
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        new KafkaPartitionLevelConsumer("clientId", new StreamConfig("tableName_REALTIME", hashMap), 0).fetchMessages(new LongMsgOffset(12345L), 10000);
    }

    @Test
    public void testFetchOffsets() {
        testFetchOffsets(TEST_TOPIC_1);
        testFetchOffsets(TEST_TOPIC_2);
    }

    private void testFetchOffsets(String str) {
        String str2 = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", str);
        hashMap.put("stream.kafka.broker.list", str2);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", hashMap);
        int fetchPartitionCount = new KafkaStreamMetadataProvider("clientId", streamConfig).fetchPartitionCount(10000L);
        for (int i = 0; i < fetchPartitionCount; i++) {
            KafkaStreamMetadataProvider kafkaStreamMetadataProvider = new KafkaStreamMetadataProvider("clientId", streamConfig, i);
            Assert.assertEquals(new LongMsgOffset(0L).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000L)), 0);
            Assert.assertEquals(new LongMsgOffset(0L).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsPeriod("2d"), 10000L)), 0);
            Assert.assertEquals(new LongMsgOffset(1000L).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetAsTimestamp(Instant.now().toString()), 10000L)), 0);
            Assert.assertEquals(new LongMsgOffset(1000L).compareTo(kafkaStreamMetadataProvider.fetchStreamPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000L)), 0);
        }
    }

    @Test
    public void testConsumer() throws Exception {
        testConsumer(TEST_TOPIC_1);
        testConsumer(TEST_TOPIC_2);
    }

    private void testConsumer(String str) throws TimeoutException {
        String str2 = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", str);
        hashMap.put("stream.kafka.broker.list", str2);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        StreamConfig streamConfig = new StreamConfig("tableName_REALTIME", hashMap);
        StreamConsumerFactory create = StreamConsumerFactoryProvider.create(streamConfig);
        int fetchPartitionCount = new KafkaStreamMetadataProvider("clientId", streamConfig).fetchPartitionCount(10000L);
        for (int i = 0; i < fetchPartitionCount; i++) {
            PartitionGroupConsumer createPartitionGroupConsumer = create.createPartitionGroupConsumer("clientId", new PartitionGroupConsumptionStatus(i, 0, new LongMsgOffset(0L), new LongMsgOffset(1000L), "CONSUMING"));
            MessageBatch fetchMessages = createPartitionGroupConsumer.fetchMessages(new LongMsgOffset(0L), 10000);
            Assert.assertEquals(fetchMessages.getMessageCount(), 500);
            Assert.assertEquals(fetchMessages.getUnfilteredMessageCount(), 500);
            for (int i2 = 0; i2 < 500; i2++) {
                StreamMessage streamMessage = fetchMessages.getStreamMessage(i2);
                Assert.assertEquals(new String((byte[]) streamMessage.getValue()), "sample_msg_" + i2);
                StreamMessageMetadata metadata = streamMessage.getMetadata();
                Assert.assertNotNull(metadata);
                Assert.assertEquals(metadata.getRecordIngestionTimeMs(), TIMESTAMP + i2);
                LongMsgOffset offset = metadata.getOffset();
                Assert.assertTrue(offset instanceof LongMsgOffset);
                Assert.assertEquals(offset.getOffset(), i2);
                LongMsgOffset nextOffset = metadata.getNextOffset();
                Assert.assertTrue(nextOffset instanceof LongMsgOffset);
                Assert.assertEquals(nextOffset.getOffset(), i2 + 1);
            }
            Assert.assertEquals(fetchMessages.getOffsetOfNextBatch().toString(), "500");
            Assert.assertEquals(fetchMessages.getFirstMessageOffset().toString(), "0");
            Assert.assertEquals(fetchMessages.getLastMessageMetadata().getOffset().toString(), "499");
            Assert.assertEquals(fetchMessages.getLastMessageMetadata().getNextOffset().toString(), "500");
            MessageBatch fetchMessages2 = createPartitionGroupConsumer.fetchMessages(new LongMsgOffset(500L), 10000);
            Assert.assertEquals(fetchMessages2.getMessageCount(), 500);
            Assert.assertEquals(fetchMessages2.getUnfilteredMessageCount(), 500);
            for (int i3 = 0; i3 < 500; i3++) {
                StreamMessage streamMessage2 = fetchMessages2.getStreamMessage(i3);
                Assert.assertEquals(new String((byte[]) streamMessage2.getValue()), "sample_msg_" + (500 + i3));
                StreamMessageMetadata metadata2 = streamMessage2.getMetadata();
                Assert.assertNotNull(metadata2);
                Assert.assertEquals(metadata2.getRecordIngestionTimeMs(), TIMESTAMP + 500 + i3);
                LongMsgOffset offset2 = metadata2.getOffset();
                Assert.assertTrue(offset2 instanceof LongMsgOffset);
                Assert.assertEquals(offset2.getOffset(), 500 + i3);
                LongMsgOffset nextOffset2 = metadata2.getNextOffset();
                Assert.assertTrue(nextOffset2 instanceof LongMsgOffset);
                Assert.assertEquals(nextOffset2.getOffset(), 501 + i3);
            }
            Assert.assertEquals(fetchMessages2.getOffsetOfNextBatch().toString(), "1000");
            Assert.assertEquals(fetchMessages2.getFirstMessageOffset().toString(), "500");
            Assert.assertEquals(fetchMessages2.getLastMessageMetadata().getOffset().toString(), "999");
            Assert.assertEquals(fetchMessages2.getLastMessageMetadata().getNextOffset().toString(), "1000");
            MessageBatch fetchMessages3 = createPartitionGroupConsumer.fetchMessages(new LongMsgOffset(10L), 10000);
            Assert.assertEquals(fetchMessages3.getMessageCount(), 500);
            Assert.assertEquals(fetchMessages3.getUnfilteredMessageCount(), 500);
            for (int i4 = 0; i4 < 500; i4++) {
                StreamMessage streamMessage3 = fetchMessages3.getStreamMessage(i4);
                Assert.assertEquals(new String((byte[]) streamMessage3.getValue()), "sample_msg_" + (10 + i4));
                StreamMessageMetadata metadata3 = streamMessage3.getMetadata();
                Assert.assertNotNull(metadata3);
                Assert.assertEquals(metadata3.getRecordIngestionTimeMs(), TIMESTAMP + 10 + i4);
                LongMsgOffset offset3 = metadata3.getOffset();
                Assert.assertTrue(offset3 instanceof LongMsgOffset);
                Assert.assertEquals(offset3.getOffset(), 10 + i4);
                LongMsgOffset nextOffset3 = metadata3.getNextOffset();
                Assert.assertTrue(nextOffset3 instanceof LongMsgOffset);
                Assert.assertEquals(nextOffset3.getOffset(), 11 + i4);
            }
            Assert.assertEquals(fetchMessages3.getOffsetOfNextBatch().toString(), "510");
            Assert.assertEquals(fetchMessages3.getFirstMessageOffset().toString(), "10");
            Assert.assertEquals(fetchMessages3.getLastMessageMetadata().getOffset().toString(), "509");
            Assert.assertEquals(fetchMessages3.getLastMessageMetadata().getNextOffset().toString(), "510");
            MessageBatch fetchMessages4 = createPartitionGroupConsumer.fetchMessages(new LongMsgOffset(610L), 10000);
            Assert.assertEquals(fetchMessages4.getMessageCount(), 390);
            Assert.assertEquals(fetchMessages4.getUnfilteredMessageCount(), 390);
            for (int i5 = 0; i5 < 390; i5++) {
                StreamMessage streamMessage4 = fetchMessages4.getStreamMessage(i5);
                Assert.assertEquals(new String((byte[]) streamMessage4.getValue()), "sample_msg_" + (610 + i5));
                StreamMessageMetadata metadata4 = streamMessage4.getMetadata();
                Assert.assertNotNull(metadata4);
                Assert.assertEquals(metadata4.getRecordIngestionTimeMs(), TIMESTAMP + 610 + i5);
                LongMsgOffset offset4 = metadata4.getOffset();
                Assert.assertTrue(offset4 instanceof LongMsgOffset);
                Assert.assertEquals(offset4.getOffset(), 610 + i5);
                LongMsgOffset nextOffset4 = metadata4.getNextOffset();
                Assert.assertTrue(nextOffset4 instanceof LongMsgOffset);
                Assert.assertEquals(nextOffset4.getOffset(), 611 + i5);
            }
            Assert.assertEquals(fetchMessages4.getOffsetOfNextBatch().toString(), "1000");
            Assert.assertEquals(fetchMessages4.getFirstMessageOffset().toString(), "610");
            Assert.assertEquals(fetchMessages4.getLastMessageMetadata().getOffset().toString(), "999");
            Assert.assertEquals(fetchMessages4.getLastMessageMetadata().getNextOffset().toString(), "1000");
        }
    }

    protected String getKafkaConsumerFactoryName() {
        return KafkaConsumerFactory.class.getName();
    }

    @Test
    public void testOffsetsExpired() throws TimeoutException {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", TEST_TOPIC_3);
        hashMap.put("stream.kafka.broker.list", this._kafkaBrokerAddress);
        hashMap.put("stream.kafka.consumer.type", "lowlevel");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        hashMap.put("auto.offset.reset", "earliest");
        MessageBatch fetchMessages = StreamConsumerFactoryProvider.create(new StreamConfig("tableName_REALTIME", hashMap)).createPartitionGroupConsumer("clientId", new PartitionGroupConsumptionStatus(0, 0, new LongMsgOffset(0L), new LongMsgOffset(1000L), "CONSUMING")).fetchMessages(new LongMsgOffset(0L), 10000);
        Assert.assertEquals(fetchMessages.getMessageCount(), 500);
        Assert.assertEquals(fetchMessages.getUnfilteredMessageCount(), 500);
        for (int i = 0; i < 500; i++) {
            Assert.assertEquals(new String((byte[]) fetchMessages.getStreamMessage(i).getValue()), "sample_msg_" + (200 + i));
        }
        Assert.assertEquals(fetchMessages.getOffsetOfNextBatch().toString(), "700");
    }

    @Test
    public void testListTopics() {
        String str = this._kafkaBrokerAddress;
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC");
        hashMap.put("stream.kafka.broker.list", str);
        hashMap.put("stream.kafka.consumer.type", "simple");
        hashMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName());
        hashMap.put("stream.kafka.decoder.class.name", "decoderClass");
        Assert.assertTrue(((List) new KafkaStreamMetadataProvider("clientId", new StreamConfig("tableName_REALTIME", hashMap)).getTopics().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())).containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3)));
    }
}
