package org.apache.pinot.plugin.stream.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.shaded.com.google.common.collect.Iterables;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.class */
public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionGroupConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PulsarPartitionLevelConsumer.class);
    private final ExecutorService _executorService;
    private final Reader _reader;
    private boolean _enableKeyValueStitch;

    public PulsarPartitionLevelConsumer(String str, StreamConfig streamConfig, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
        super(str, streamConfig);
        PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, str);
        this._reader = createReaderForPartition(pulsarConfig.getPulsarTopicName(), partitionGroupConsumptionStatus.getPartitionGroupId(), pulsarConfig.getInitialMessageId());
        LOGGER.info("Created pulsar reader with id {} for topic {} partition {}", this._reader, this._config.getPulsarTopicName(), Integer.valueOf(partitionGroupConsumptionStatus.getPartitionGroupId()));
        this._executorService = Executors.newSingleThreadExecutor();
        this._enableKeyValueStitch = this._config.getEnableKeyValueStitch();
    }

    @Override // org.apache.pinot.spi.stream.PartitionGroupConsumer
    public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2, int i) {
        MessageId messageId = ((MessageIdStreamOffset) streamPartitionMsgOffset).getMessageId();
        MessageId messageId2 = streamPartitionMsgOffset2 == null ? MessageId.latest : ((MessageIdStreamOffset) streamPartitionMsgOffset2).getMessageId();
        ArrayList arrayList = new ArrayList();
        Future submit = this._executorService.submit(() -> {
            return fetchMessages(messageId, messageId2, (List<PulsarStreamMessage>) arrayList);
        });
        try {
            return (PulsarMessageBatch) submit.get(i, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            submit.cancel(true);
            return new PulsarMessageBatch(buildOffsetFilteringIterable(arrayList, messageId, messageId2), this._enableKeyValueStitch);
        } catch (Exception e2) {
            LOGGER.warn("Error while fetching records from Pulsar", (Throwable) e2);
            return new PulsarMessageBatch(buildOffsetFilteringIterable(arrayList, messageId, messageId2), this._enableKeyValueStitch);
        }
    }

    public PulsarMessageBatch fetchMessages(MessageId messageId, MessageId messageId2, List<PulsarStreamMessage> list) {
        try {
            this._reader.seek(messageId);
            while (this._reader.hasMessageAvailable()) {
                Message readNext = this._reader.readNext();
                if (messageId2 != null && readNext.getMessageId().compareTo(messageId2) > 0) {
                    break;
                }
                list.add(PulsarUtils.buildPulsarStreamMessage(readNext, this._enableKeyValueStitch, this._pulsarMetadataExtractor));
                if (Thread.interrupted()) {
                    break;
                }
            }
            return new PulsarMessageBatch(buildOffsetFilteringIterable(list, messageId, messageId2), this._enableKeyValueStitch);
        } catch (PulsarClientException e) {
            LOGGER.warn("Error consuming records from Pulsar topic", (Throwable) e);
            return new PulsarMessageBatch(buildOffsetFilteringIterable(list, messageId, messageId2), this._enableKeyValueStitch);
        }
    }

    private Iterable<PulsarStreamMessage> buildOffsetFilteringIterable(List<PulsarStreamMessage> list, MessageId messageId, MessageId messageId2) {
        return Iterables.filter(list, pulsarStreamMessage -> {
            return pulsarStreamMessage != null && pulsarStreamMessage.getValue() != null && pulsarStreamMessage.getMessageId().compareTo(messageId) >= 0 && (messageId2 == null || pulsarStreamMessage.getMessageId().compareTo(messageId2) < 0);
        });
    }

    @Override // org.apache.pinot.plugin.stream.pulsar.PulsarPartitionLevelConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._reader.close();
        super.close();
        shutdownAndAwaitTermination();
    }

    void shutdownAndAwaitTermination() {
        this._executorService.shutdown();
        try {
            if (!this._executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                this._executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this._executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
