package org.apache.pinot.plugin.stream.kinesis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.plugin.stream.kinesis.server.KinesisDataProducer;
import org.apache.pinot.shaded.software.amazon.awssdk.services.kinesis.model.Shard;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.class */
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
    private static final String SHARD_ID_PREFIX = "shardId-";
    private final KinesisConnectionHandler _kinesisConnectionHandler;
    private final StreamConsumerFactory _kinesisStreamConsumerFactory;
    private final String _clientId;
    private final int _fetchTimeoutMs;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KinesisStreamMetadataProvider.class);

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig) {
        this._kinesisConnectionHandler = new KinesisConnectionHandler(new KinesisConfig(streamConfig));
        this._kinesisStreamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
        this._clientId = str;
        this._fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
    }

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) {
        this._kinesisConnectionHandler = kinesisConnectionHandler;
        this._kinesisStreamConsumerFactory = streamConsumerFactory;
        this._clientId = str;
        this._fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
    }

    public int fetchPartitionCount(long j) {
        throw new UnsupportedOperationException();
    }

    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long j) {
        throw new UnsupportedOperationException();
    }

    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String str, StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> list, int i) throws IOException, TimeoutException {
        Shard shard;
        String parentShardId;
        StreamPartitionMsgOffset startOffset;
        ArrayList arrayList = new ArrayList();
        Map map = (Map) this._kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap((v0) -> {
            return v0.shardId();
        }, shard2 -> {
            return shard2;
        }, (shard3, shard4) -> {
            return shard3;
        }));
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (PartitionGroupConsumptionStatus partitionGroupConsumptionStatus : list) {
            KinesisPartitionGroupOffset kinesisPartitionGroupOffset = (KinesisPartitionGroupOffset) partitionGroupConsumptionStatus.getStartOffset();
            String next = kinesisPartitionGroupOffset.getShardToStartSequenceMap().keySet().iterator().next();
            hashSet.add(next);
            Shard shard5 = (Shard) map.get(next);
            if (shard5 == null) {
                hashSet2.add(next);
                LOGGER.warn("Kinesis shard with id: " + next + " has expired. Data has been consumed from the shard till sequence number: " + kinesisPartitionGroupOffset.getShardToStartSequenceMap().get(next) + ". There can be potential data loss.");
            } else {
                StreamPartitionMsgOffset endOffset = partitionGroupConsumptionStatus.getEndOffset();
                if (endOffset == null) {
                    startOffset = partitionGroupConsumptionStatus.getStartOffset();
                } else if (shard5.sequenceNumberRange().endingSequenceNumber() == null || !consumedEndOfShard(endOffset, partitionGroupConsumptionStatus)) {
                    startOffset = endOffset;
                } else {
                    hashSet2.add(next);
                }
                arrayList.add(new PartitionGroupMetadata(partitionGroupConsumptionStatus.getPartitionGroupId(), startOffset));
            }
        }
        for (Map.Entry entry : map.entrySet()) {
            String str2 = (String) entry.getKey();
            if (!hashSet.contains(str2) && ((parentShardId = (shard = (Shard) entry.getValue()).parentShardId()) == null || !map.containsKey(parentShardId) || hashSet2.contains(parentShardId))) {
                HashMap hashMap = new HashMap();
                hashMap.put(str2, shard.sequenceNumberRange().startingSequenceNumber());
                arrayList.add(new PartitionGroupMetadata(getPartitionGroupIdFromShardId(str2), new KinesisPartitionGroupOffset(hashMap)));
            }
        }
        return arrayList;
    }

    private int getPartitionGroupIdFromShardId(String str) {
        String stripStart = StringUtils.stripStart(StringUtils.removeStart(str, SHARD_ID_PREFIX), KinesisDataProducer.DEFAULT_NUM_RETRIES);
        if (stripStart.isEmpty()) {
            return 0;
        }
        return Integer.parseInt(stripStart);
    }

    private boolean consumedEndOfShard(StreamPartitionMsgOffset streamPartitionMsgOffset, PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) throws IOException, TimeoutException {
        PartitionGroupConsumer createPartitionGroupConsumer = this._kinesisStreamConsumerFactory.createPartitionGroupConsumer(this._clientId, partitionGroupConsumptionStatus);
        try {
            MessageBatch fetchMessages = createPartitionGroupConsumer.fetchMessages(streamPartitionMsgOffset, (StreamPartitionMsgOffset) null, this._fetchTimeoutMs);
            createPartitionGroupConsumer.close();
            return fetchMessages.isEndOfPartitionGroup();
        } catch (Throwable th) {
            createPartitionGroupConsumer.close();
            throw th;
        }
    }

    public void close() {
    }
}
