package org.apache.pinot.plugin.stream.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.class */
public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionGroupConsumer {
    private static final Logger LOGGER;
    private final Reader<byte[]> _reader;
    private MessageId _nextMessageId;
    static final /* synthetic */ boolean $assertionsDisabled;

    public PulsarPartitionLevelConsumer(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig);
        this._nextMessageId = null;
        String pulsarTopicName = this._config.getPulsarTopicName();
        try {
            this._reader = this._pulsarClient.newReader().topic((String) ((List) this._pulsarClient.getPartitionsForTopic(pulsarTopicName).get()).get(i)).startMessageId(MessageId.earliest).startMessageIdInclusive().create();
            LOGGER.info("Created Pulsar reader for topic: {}, partition: {}", pulsarTopicName, Integer.valueOf(i));
        } catch (Exception e) {
            throw new RuntimeException(String.format("Caught exception while creating Pulsar reader for topic: %s, partition: %d", pulsarTopicName, Integer.valueOf(i)), e);
        }
    }

    /* renamed from: fetchMessages, reason: merged with bridge method [inline-methods] */
    public synchronized PulsarMessageBatch m3fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, int i) {
        MessageIdStreamOffset messageIdStreamOffset;
        MessageId messageId = ((MessageIdStreamOffset) streamPartitionMsgOffset).getMessageId();
        long currentTimeMillis = System.currentTimeMillis() + i;
        ArrayList arrayList = new ArrayList();
        if (!Objects.equals(messageId, this._nextMessageId)) {
            try {
                this._reader.seek(messageId);
            } catch (PulsarClientException e) {
                throw new RuntimeException("Caught exception while seeking to message id: " + messageId, e);
            }
        }
        while (this._reader.hasMessageAvailable() && System.currentTimeMillis() < currentTimeMillis) {
            try {
                arrayList.add(PulsarUtils.buildPulsarStreamMessage(this._reader.readNext(), this._config));
            } catch (PulsarClientException e2) {
                throw new RuntimeException("Caught exception while fetching messages from Pulsar", e2);
            }
        }
        if (arrayList.isEmpty()) {
            messageIdStreamOffset = (MessageIdStreamOffset) streamPartitionMsgOffset;
        } else {
            StreamMessageMetadata metadata = ((BytesStreamMessage) arrayList.get(arrayList.size() - 1)).getMetadata();
            if (!$assertionsDisabled && metadata == null) {
                throw new AssertionError();
            }
            messageIdStreamOffset = (MessageIdStreamOffset) metadata.getNextOffset();
        }
        if (!$assertionsDisabled && messageIdStreamOffset == null) {
            throw new AssertionError();
        }
        this._nextMessageId = messageIdStreamOffset.getMessageId();
        return new PulsarMessageBatch(arrayList, messageIdStreamOffset, this._reader.hasReachedEndOfTopic());
    }

    @Override // org.apache.pinot.plugin.stream.pulsar.PulsarPartitionLevelConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._reader.close();
        super.close();
    }

    static {
        $assertionsDisabled = !PulsarPartitionLevelConsumer.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
    }
}
