package org.apache.pinot.plugin.stream.pulsar;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.class */
public class PulsarMessageBatch implements MessageBatch<byte[]> {
    private List<Message<byte[]>> _messageList = new ArrayList();
    private final boolean _enableKeyValueStitch;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PulsarMessageBatch.class);
    private static ByteBuffer _lengthBuf = ByteBuffer.allocate(4);

    public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean z) {
        List<Message<byte[]>> list = this._messageList;
        Objects.requireNonNull(list);
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
        this._enableKeyValueStitch = z;
    }

    @Override // org.apache.pinot.spi.stream.MessageBatch
    public int getMessageCount() {
        return this._messageList.size();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pinot.spi.stream.MessageBatch
    public byte[] getMessageAtIndex(int i) {
        Message<byte[]> message = this._messageList.get(i);
        return this._enableKeyValueStitch ? stitchKeyValue(message.getKeyBytes(), message.getData()) : message.getData();
    }

    @Override // org.apache.pinot.spi.stream.MessageBatch
    public int getMessageOffsetAtIndex(int i) {
        return ByteBuffer.wrap(this._messageList.get(i).getData()).arrayOffset();
    }

    @Override // org.apache.pinot.spi.stream.MessageBatch
    public int getMessageLengthAtIndex(int i) {
        if (!this._enableKeyValueStitch) {
            return this._messageList.get(i).getData().length;
        }
        Message<byte[]> message = this._messageList.get(i);
        return 8 + message.getKeyBytes().length + message.getData().length;
    }

    @Override // org.apache.pinot.spi.stream.MessageBatch
    public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int i) {
        MessageId newMessageId;
        MessageIdImpl convertToMessageIdImpl = MessageIdImpl.convertToMessageIdImpl(this._messageList.get(i).getMessageId());
        long ledgerId = convertToMessageIdImpl.getLedgerId();
        long entryId = convertToMessageIdImpl.getEntryId();
        int partitionIndex = convertToMessageIdImpl.getPartitionIndex();
        if (convertToMessageIdImpl instanceof BatchMessageIdImpl) {
            int batchIndex = ((BatchMessageIdImpl) convertToMessageIdImpl).getBatchIndex();
            int batchSize = ((BatchMessageIdImpl) convertToMessageIdImpl).getBatchSize();
            newMessageId = batchIndex < batchSize - 1 ? new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex + 1, batchSize, ((BatchMessageIdImpl) convertToMessageIdImpl).getAcker()) : new BatchMessageIdImpl(ledgerId, entryId + 1, partitionIndex, 0, batchSize, ((BatchMessageIdImpl) convertToMessageIdImpl).getAcker());
        } else {
            newMessageId = DefaultImplementation.newMessageId(ledgerId, entryId + 1, partitionIndex);
        }
        return new MessageIdStreamOffset(newMessageId);
    }

    @Override // org.apache.pinot.spi.stream.MessageBatch
    public long getNextStreamMessageOffsetAtIndex(int i) {
        throw new UnsupportedOperationException("Pulsar does not support long stream offsets");
    }

    private byte[] stitchKeyValue(byte[] bArr, byte[] bArr2) {
        int length = bArr.length;
        int length2 = bArr2.length;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(8 + length + length2);
            _lengthBuf.clear();
            byteArrayOutputStream.write(_lengthBuf.putInt(length).array());
            byteArrayOutputStream.write(bArr);
            _lengthBuf.clear();
            byteArrayOutputStream.write(_lengthBuf.putInt(length2).array());
            byteArrayOutputStream.write(bArr2);
            return byteArrayOutputStream.toByteArray();
        } catch (Exception e) {
            LOGGER.error("Unable to stitch key and value bytes together", (Throwable) e);
            return null;
        }
    }
}
