package org.apache.pinot.plugin.stream.pulsar;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.plugin.stream.pulsar.PulsarStreamMessageMetadata;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarUtils.class */
public class PulsarUtils {
    private PulsarUtils() {
    }

    public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) {
        if (offsetCriteria.isLargest()) {
            return SubscriptionInitialPosition.Latest;
        }
        if (offsetCriteria.isSmallest()) {
            return SubscriptionInitialPosition.Earliest;
        }
        throw new IllegalArgumentException("Unsupported offset criteria: " + offsetCriteria);
    }

    public static byte[] stitchKeyValue(byte[] bArr, byte[] bArr2) {
        byte[] bArr3 = new byte[8 + bArr.length + bArr2.length];
        ByteBuffer wrap = ByteBuffer.wrap(bArr3);
        wrap.putInt(bArr.length);
        wrap.put(bArr);
        wrap.putInt(bArr2.length);
        wrap.put(bArr2);
        return bArr3;
    }

    public static BytesStreamMessage buildPulsarStreamMessage(Message<byte[]> message, PulsarConfig pulsarConfig) {
        byte[] keyBytes = message.getKeyBytes();
        byte[] data = message.getData();
        if (pulsarConfig.getEnableKeyValueStitch()) {
            data = stitchKeyValue(keyBytes, data);
        }
        return new BytesStreamMessage(keyBytes, data, extractMessageMetadata(message, pulsarConfig));
    }

    @VisibleForTesting
    static StreamMessageMetadata extractMessageMetadata(Message<byte[]> message, PulsarConfig pulsarConfig) {
        long longValue = ((Long) message.getBrokerPublishTime().orElse(Long.valueOf(message.getPublishTime()))).longValue();
        MessageId messageId = message.getMessageId();
        StreamMessageMetadata.Builder offset = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(longValue).setOffset(new MessageIdStreamOffset(messageId), new MessageIdStreamOffset(getNextMessageId(messageId)));
        if (pulsarConfig.isPopulateMetadata()) {
            Map properties = message.getProperties();
            if (!properties.isEmpty()) {
                GenericRow genericRow = new GenericRow();
                for (Map.Entry entry : properties.entrySet()) {
                    genericRow.putValue((String) entry.getKey(), entry.getValue());
                }
                offset.setHeaders(genericRow);
            }
            Set<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> metadataFields = pulsarConfig.getMetadataFields();
            if (!metadataFields.isEmpty()) {
                HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(metadataFields.size());
                Iterator<PulsarStreamMessageMetadata.PulsarMessageMetadataValue> it = metadataFields.iterator();
                while (it.hasNext()) {
                    populateMetadataField(message, it.next(), newHashMapWithExpectedSize);
                }
                offset.setMetadata(newHashMapWithExpectedSize);
            }
        }
        return offset.build();
    }

    public static MessageId getNextMessageId(MessageId messageId) {
        MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
        long ledgerId = messageIdAdv.getLedgerId();
        long entryId = messageIdAdv.getEntryId();
        int partitionIndex = messageIdAdv.getPartitionIndex();
        int batchSize = messageIdAdv.getBatchSize();
        if (batchSize <= 0) {
            return new MessageIdImpl(ledgerId, entryId + 1, partitionIndex);
        }
        int batchIndex = messageIdAdv.getBatchIndex();
        BitSet ackSet = messageIdAdv.getAckSet();
        return batchIndex < batchSize - 1 ? new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex + 1, batchSize, ackSet) : new BatchMessageIdImpl(ledgerId, entryId + 1, partitionIndex, 0, batchSize, ackSet);
    }

    private static void populateMetadataField(Message<byte[]> message, PulsarStreamMessageMetadata.PulsarMessageMetadataValue pulsarMessageMetadataValue, Map<String, String> map) {
        switch (pulsarMessageMetadataValue) {
            case PUBLISH_TIME:
                long publishTime = message.getPublishTime();
                if (publishTime > 0) {
                    setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PUBLISH_TIME, publishTime);
                    return;
                }
                return;
            case EVENT_TIME:
                long eventTime = message.getEventTime();
                if (eventTime > 0) {
                    setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.EVENT_TIME, eventTime);
                    return;
                }
                return;
            case BROKER_PUBLISH_TIME:
                message.getBrokerPublishTime().ifPresent(l -> {
                    setMetadataMapField((Map<String, String>) map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.BROKER_PUBLISH_TIME, l.longValue());
                });
                return;
            case MESSAGE_KEY:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_KEY, message.getKey());
                return;
            case MESSAGE_ID:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID, message.getMessageId().toString());
                return;
            case MESSAGE_ID_BYTES_B64:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.MESSAGE_ID_BYTES_B64, message.getMessageId().toByteArray());
                return;
            case PRODUCER_NAME:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.PRODUCER_NAME, message.getProducerName());
                return;
            case SCHEMA_VERSION:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SCHEMA_VERSION, message.getSchemaVersion());
                return;
            case SEQUENCE_ID:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SEQUENCE_ID, message.getSequenceId());
                return;
            case ORDERING_KEY:
                if (message.hasOrderingKey()) {
                    setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.ORDERING_KEY, message.getOrderingKey());
                    return;
                }
                return;
            case SIZE:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.SIZE, message.size());
                return;
            case TOPIC_NAME:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.TOPIC_NAME, message.getTopicName());
                return;
            case INDEX:
                message.getIndex().ifPresent(l2 -> {
                    setMetadataMapField((Map<String, String>) map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.INDEX, l2.longValue());
                });
                return;
            case REDELIVERY_COUNT:
                setMetadataMapField(map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue.REDELIVERY_COUNT, message.getRedeliveryCount());
                return;
            default:
                throw new IllegalArgumentException("Unsupported metadata field: " + pulsarMessageMetadataValue);
        }
    }

    private static void setMetadataMapField(Map<String, String> map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue pulsarMessageMetadataValue, String str) {
        if (StringUtils.isNotEmpty(str)) {
            map.put(pulsarMessageMetadataValue.getKey(), str);
        }
    }

    private static void setMetadataMapField(Map<String, String> map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue pulsarMessageMetadataValue, int i) {
        setMetadataMapField(map, pulsarMessageMetadataValue, Integer.toString(i));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void setMetadataMapField(Map<String, String> map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue pulsarMessageMetadataValue, long j) {
        setMetadataMapField(map, pulsarMessageMetadataValue, Long.toString(j));
    }

    private static void setMetadataMapField(Map<String, String> map, PulsarStreamMessageMetadata.PulsarMessageMetadataValue pulsarMessageMetadataValue, byte[] bArr) {
        if (bArr == null || bArr.length <= 0) {
            return;
        }
        setMetadataMapField(map, pulsarMessageMetadataValue, Base64.getEncoder().encodeToString(bArr));
    }
}
