package org.apache.pinot.plugin.stream.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.ConsumerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.class */
public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnectionHandler implements StreamMetadataProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PulsarStreamMetadataProvider.class);
    private final StreamConfig _streamConfig;
    private final int _partition;

    public PulsarStreamMetadataProvider(String str, StreamConfig streamConfig) {
        this(str, streamConfig, 0);
    }

    public PulsarStreamMetadataProvider(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig, i);
        this._streamConfig = streamConfig;
        this._partition = i;
    }

    @Override // org.apache.pinot.spi.stream.StreamMetadataProvider
    public int fetchPartitionCount(long j) {
        try {
            return this._pulsarClient.getPartitionsForTopic(this._streamConfig.getTopicName()).get().size();
        } catch (Exception e) {
            throw new RuntimeException("Cannot fetch partitions for topic: " + this._streamConfig.getTopicName(), e);
        }
    }

    @Override // org.apache.pinot.spi.stream.StreamMetadataProvider
    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long j) {
        MessageId messageId;
        Preconditions.checkNotNull(offsetCriteria);
        try {
            try {
                Consumer<byte[]> subscribe = this._pulsarClient.newConsumer().topic(this._topic).subscriptionInitialPosition(PulsarUtils.offsetCriteriaToSubscription(offsetCriteria)).subscriptionName("Pinot_" + UUID.randomUUID()).subscribe();
                if (offsetCriteria.isLargest()) {
                    messageId = subscribe.getLastMessageId();
                } else {
                    if (!offsetCriteria.isSmallest()) {
                        throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
                    }
                    messageId = subscribe.receive().getMessageId();
                }
                MessageIdStreamOffset messageIdStreamOffset = new MessageIdStreamOffset(messageId);
                closeConsumer(subscribe);
                return messageIdStreamOffset;
            } catch (PulsarClientException e) {
                LOGGER.error("Cannot fetch offsets for partition " + this._partition + " and topic " + this._topic + " and offsetCriteria " + offsetCriteria, (Throwable) e);
                closeConsumer(null);
                return null;
            }
        } catch (Throwable th) {
            closeConsumer(null);
            throw th;
        }
    }

    @Override // org.apache.pinot.spi.stream.StreamMetadataProvider
    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String str, StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> list, int i) {
        MessageId messageId;
        ArrayList arrayList = new ArrayList();
        for (PartitionGroupConsumptionStatus partitionGroupConsumptionStatus : list) {
            arrayList.add(new PartitionGroupMetadata(partitionGroupConsumptionStatus.getPartitionGroupId(), partitionGroupConsumptionStatus.getStartOffset()));
        }
        PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, str);
        Consumer<byte[]> consumer = null;
        try {
            try {
                List<String> list2 = this._pulsarClient.getPartitionsForTopic(this._topic).get();
                if (list2.size() > list.size()) {
                    for (int size = list.size(); size < list2.size(); size++) {
                        consumer = this._pulsarClient.newConsumer().topic(list2.get(size)).subscriptionInitialPosition(pulsarConfig.getInitialSubscriberPosition()).subscriptionName(ConsumerName.generateRandomName()).subscribe();
                        Message<byte[]> receive = consumer.receive(i, TimeUnit.MILLISECONDS);
                        if (receive != null) {
                            arrayList.add(new PartitionGroupMetadata(size, new MessageIdStreamOffset(receive.getMessageId())));
                        } else {
                            try {
                                messageId = consumer.getLastMessageIdAsync().get(i, TimeUnit.MILLISECONDS);
                            } catch (TimeoutException e) {
                                messageId = MessageId.latest;
                            }
                            arrayList.add(new PartitionGroupMetadata(size, new MessageIdStreamOffset(messageId)));
                        }
                    }
                }
                closeConsumer(consumer);
            } catch (Throwable th) {
                closeConsumer(consumer);
                throw th;
            }
        } catch (Exception e2) {
            LOGGER.warn("Error encountered while calculating pulsar partition group metadata: " + e2.getMessage(), (Throwable) e2);
            closeConsumer(consumer);
        }
        return arrayList;
    }

    private void closeConsumer(Consumer consumer) {
        if (consumer != null) {
            try {
                consumer.close();
            } catch (Exception e) {
                LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", consumer, e);
            }
        }
    }

    @Override // org.apache.pinot.plugin.stream.pulsar.PulsarPartitionLevelConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
    }
}
