package org.apache.pinot.plugin.stream.pulsar;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.plugin.stream.pulsar.PulsarMessageBatchTest;
import org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pulsar.client.api.MessageId;
import org.bouncycastle.util.encoders.Base64;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarUtilsTest.class */
public class PulsarUtilsTest {
    @Test
    public void testExtractProperty() throws Exception {
        PulsarConfig pulsarConfig = (PulsarConfig) Mockito.mock(PulsarConfig.class);
        Mockito.when(Boolean.valueOf(pulsarConfig.isPopulateMetadata())).thenReturn(true);
        Mockito.when(pulsarConfig.getMetadataFields()).thenReturn(Set.of(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY));
        PulsarMessageBatchTest.DummyPulsarMessage dummyPulsarMessage = new PulsarMessageBatchTest.DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8));
        dummyPulsarMessage.getProperties().put("test_key", "test_value");
        dummyPulsarMessage.getProperties().put("test_key2", "2");
        StreamMessageMetadata extractMessageMetadata = PulsarUtils.extractMessageMetadata(dummyPulsarMessage, pulsarConfig);
        GenericRow headers = extractMessageMetadata.getHeaders();
        Assert.assertNotNull(headers);
        Assert.assertEquals(headers.getValue("test_key"), "test_value");
        Assert.assertEquals(headers.getValue("test_key2"), "2");
        Map recordMetadata = extractMessageMetadata.getRecordMetadata();
        Assert.assertNotNull(recordMetadata);
        Assert.assertEquals((String) recordMetadata.get(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY.getKey()), "key");
        Assert.assertEquals((String) recordMetadata.get(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID.getKey()), dummyPulsarMessage.getMessageId().toString());
        Assert.assertEquals(MessageId.earliest, MessageId.fromByteArray(Base64.decode((String) recordMetadata.get(PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64.getKey()))));
    }

    @Test
    public void testPulsarSteamMessageUnstitched() {
        PulsarConfig pulsarConfig = (PulsarConfig) Mockito.mock(PulsarConfig.class);
        Mockito.when(Boolean.valueOf(pulsarConfig.getEnableKeyValueStitch())).thenReturn(false);
        BytesStreamMessage buildPulsarStreamMessage = PulsarUtils.buildPulsarStreamMessage(new PulsarMessageBatchTest.DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)), pulsarConfig);
        Assert.assertEquals(buildPulsarStreamMessage.getKey(), "key".getBytes(StandardCharsets.UTF_8));
        Assert.assertEquals((byte[]) buildPulsarStreamMessage.getValue(), "value".getBytes(StandardCharsets.UTF_8));
    }

    @Test
    public void testPulsarSteamMessageStitched() {
        PulsarConfig pulsarConfig = (PulsarConfig) Mockito.mock(PulsarConfig.class);
        Mockito.when(Boolean.valueOf(pulsarConfig.getEnableKeyValueStitch())).thenReturn(true);
        byte[] stitchKeyValue = PulsarUtils.stitchKeyValue("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8));
        BytesStreamMessage buildPulsarStreamMessage = PulsarUtils.buildPulsarStreamMessage(new PulsarMessageBatchTest.DummyPulsarMessage("key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8)), pulsarConfig);
        Assert.assertEquals(buildPulsarStreamMessage.getKey(), "key".getBytes(StandardCharsets.UTF_8));
        Assert.assertEquals((byte[]) buildPulsarStreamMessage.getValue(), stitchKeyValue);
    }
}
