package org.apache.pinot.plugin.inputformat.protobuf;

import com.google.protobuf.Message;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pinot.plugin.inputformat.protobuf.Sample;
import org.apache.pinot.plugin.inputformat.protobuf.kafka.schemaregistry.SchemaRegistryStarter;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/inputformat/protobuf/ProtoBufConfluentSchemaTest.class */
public class ProtoBufConfluentSchemaTest {
    public static final String TOPIC_PROTO = "test_topic_proto";
    SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;
    private Producer<byte[], Message> _protoProducer;

    @BeforeClass
    public void setup() {
        this._schemaRegistry = SchemaRegistryStarter.startLocalInstance(9093);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this._schemaRegistry._kafkaContainer.getBootstrapServers());
        properties.put("schema.registry.url", this._schemaRegistry.getUrl());
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
        this._protoProducer = new KafkaProducer(properties);
    }

    @Test
    public void testSamplePinotConsumer() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            Sample.SampleRecord.Builder id = Sample.SampleRecord.newBuilder().addFriends(UUID.randomUUID().toString()).addFriends(UUID.randomUUID().toString()).setEmail(UUID.randomUUID().toString()).setName(UUID.randomUUID().toString()).setId(i);
            if (i % 2 == 0) {
                id.setOptionalField(UUID.randomUUID().toString());
            }
            Sample.SampleRecord m193build = id.m193build();
            this._protoProducer.send(new ProducerRecord(TOPIC_PROTO, m193build));
            arrayList.add(m193build);
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this._schemaRegistry._kafkaContainer.getBootstrapServers());
        properties.put("schema.registry.url", this._schemaRegistry.getUrl());
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "foo_bar");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC_PROTO));
        Iterator it = kafkaConsumer.poll(Duration.ofMillis(1000L)).iterator();
        KafkaConfluentSchemaRegistryProtoBufMessageDecoder kafkaConfluentSchemaRegistryProtoBufMessageDecoder = new KafkaConfluentSchemaRegistryProtoBufMessageDecoder(message -> {
            Assert.assertNull(message.getDescriptorForType().findFieldByName("optionalField").getRealContainingOneof(), "Received protobuf have been rewritten");
        });
        HashMap hashMap = new HashMap();
        hashMap.put("schema.registry.rest.url", this._schemaRegistry.getUrl());
        kafkaConfluentSchemaRegistryProtoBufMessageDecoder.init(hashMap, (Set) null, TOPIC_PROTO);
        GenericRow genericRow = new GenericRow();
        ArrayList arrayList2 = new ArrayList();
        while (it.hasNext()) {
            kafkaConfluentSchemaRegistryProtoBufMessageDecoder.decode((byte[]) ((ConsumerRecord) it.next()).value(), genericRow);
            arrayList2.add(genericRow.copy());
            genericRow.clear();
        }
        Assert.assertEquals(arrayList2.size(), 10);
        for (int i2 = 0; i2 < 10; i2++) {
            Sample.SampleRecord sampleRecord = (Sample.SampleRecord) arrayList.get(i2);
            GenericRow genericRow2 = (GenericRow) arrayList2.get(i2);
            Assert.assertEquals(genericRow2.getValue("name"), sampleRecord.getName(), "Unexpected 'name' value");
            Assert.assertEquals(genericRow2.getValue("id"), Integer.valueOf(sampleRecord.getId()), "Unexpected 'id' value");
            Assert.assertEquals(genericRow2.getValue("email"), sampleRecord.getEmail(), "Unexpected 'email' value");
            Assert.assertEquals(genericRow2.getValue("friends"), sampleRecord.mo160getFriendsList().asByteStringList().stream().map((v0) -> {
                return v0.toStringUtf8();
            }).toArray(i3 -> {
                return new Object[i3];
            }), "Unexpected 'friends' value");
            Assert.assertEquals(genericRow2.getValue("optionalField"), i2 % 2 == 0 ? sampleRecord.getOptionalField() : null, "Unexpected 'optionalField' value");
        }
    }

    @AfterClass
    public void tearDown() {
        this._schemaRegistry.stop();
    }
}
