package org.apache.pinot.plugin.stream.kinesis;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.class */
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
    public static final String SHARD_ID_PREFIX = "shardId-";
    private final KinesisConnectionHandler _kinesisConnectionHandler;
    private final StreamConsumerFactory _kinesisStreamConsumerFactory;
    private final String _clientId;
    private final int _fetchTimeoutMs;
    private final String _partitionId;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KinesisStreamMetadataProvider.class);

    /* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider$KinesisTopicMetadata.class */
    public static class KinesisTopicMetadata implements StreamMetadataProvider.TopicMetadata {
        private String _name;

        public String getName() {
            return this._name;
        }

        public KinesisTopicMetadata setName(String str) {
            this._name = str;
            return this;
        }
    }

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig) {
        this(str, streamConfig, String.valueOf(Integer.MIN_VALUE));
    }

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig, String str2) {
        this._kinesisConnectionHandler = new KinesisConnectionHandler(new KinesisConfig(streamConfig));
        this._kinesisStreamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
        this._clientId = str;
        this._partitionId = str2;
        this._fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
    }

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) {
        this(str, streamConfig, String.valueOf(Integer.MIN_VALUE), kinesisConnectionHandler, streamConsumerFactory);
    }

    public KinesisStreamMetadataProvider(String str, StreamConfig streamConfig, String str2, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) {
        this._kinesisConnectionHandler = kinesisConnectionHandler;
        this._kinesisStreamConsumerFactory = streamConsumerFactory;
        this._clientId = str;
        this._partitionId = str2;
        this._fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
    }

    public int fetchPartitionCount(long j) {
        try {
            return this._kinesisConnectionHandler.getShards().size();
        } catch (Exception e) {
            LOGGER.error("Failed to fetch partition count", (Throwable) e);
            throw new RuntimeException("Failed to fetch partition count", e);
        }
    }

    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long j) {
        Shard orElseThrow = this._kinesisConnectionHandler.getShards().stream().filter(shard -> {
            return shard.shardId().equals("shardId-" + this._partitionId);
        }).findFirst().orElseThrow(() -> {
            return new RuntimeException("Failed to find shard for partitionId: " + this._partitionId);
        });
        SequenceNumberRange sequenceNumberRange = orElseThrow.sequenceNumberRange();
        if (offsetCriteria.isSmallest()) {
            return new KinesisPartitionGroupOffset(orElseThrow.shardId(), sequenceNumberRange.startingSequenceNumber());
        }
        if (offsetCriteria.isLargest()) {
            return new KinesisPartitionGroupOffset(orElseThrow.shardId(), sequenceNumberRange.endingSequenceNumber());
        }
        throw new IllegalArgumentException("Unsupported offset criteria: " + String.valueOf(offsetCriteria));
    }

    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String str, StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> list, int i) throws IOException, TimeoutException {
        Shard shard;
        String parentShardId;
        StreamPartitionMsgOffset startOffset;
        ArrayList arrayList = new ArrayList();
        Map map = (Map) this._kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap((v0) -> {
            return v0.shardId();
        }, shard2 -> {
            return shard2;
        }, (shard3, shard4) -> {
            return shard3;
        }));
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (PartitionGroupConsumptionStatus partitionGroupConsumptionStatus : list) {
            KinesisPartitionGroupOffset kinesisPartitionGroupOffset = (KinesisPartitionGroupOffset) partitionGroupConsumptionStatus.getStartOffset();
            String shardId = kinesisPartitionGroupOffset.getShardId();
            hashSet.add(shardId);
            Shard shard5 = (Shard) map.get(shardId);
            if (shard5 == null) {
                hashSet2.add(shardId);
                LOGGER.warn("Kinesis shard with id: {} has expired. Data has been consumed from the shard till sequence number: {}. There can be potential data loss.", shardId, kinesisPartitionGroupOffset.getSequenceNumber());
            } else {
                StreamPartitionMsgOffset endOffset = partitionGroupConsumptionStatus.getEndOffset();
                if (endOffset == null) {
                    startOffset = partitionGroupConsumptionStatus.getStartOffset();
                } else if (shard5.sequenceNumberRange().endingSequenceNumber() == null || !consumedEndOfShard(endOffset, partitionGroupConsumptionStatus)) {
                    startOffset = endOffset;
                } else {
                    hashSet2.add(shardId);
                }
                arrayList.add(new PartitionGroupMetadata(partitionGroupConsumptionStatus.getPartitionGroupId(), startOffset));
            }
        }
        for (Map.Entry entry : map.entrySet()) {
            String str2 = (String) entry.getKey();
            if (!hashSet.contains(str2) && ((parentShardId = (shard = (Shard) entry.getValue()).parentShardId()) == null || !map.containsKey(parentShardId) || hashSet2.contains(parentShardId))) {
                arrayList.add(new PartitionGroupMetadata(getPartitionGroupIdFromShardId(str2), new KinesisPartitionGroupOffset(str2, shard.sequenceNumberRange().startingSequenceNumber())));
            }
        }
        return arrayList;
    }

    private int getPartitionGroupIdFromShardId(String str) {
        String stripStart = StringUtils.stripStart(StringUtils.removeStart(str, SHARD_ID_PREFIX), "0");
        if (stripStart.isEmpty()) {
            return 0;
        }
        return Integer.parseInt(stripStart);
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x0039  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean consumedEndOfShard(org.apache.pinot.spi.stream.StreamPartitionMsgOffset r5, org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus r6) throws java.io.IOException, java.util.concurrent.TimeoutException {
        /*
            r4 = this;
            r0 = r4
            org.apache.pinot.spi.stream.StreamConsumerFactory r0 = r0._kinesisStreamConsumerFactory
            r1 = r4
            java.lang.String r1 = r1._clientId
            r2 = r6
            org.apache.pinot.spi.stream.PartitionGroupConsumer r0 = r0.createPartitionGroupConsumer(r1, r2)
            r7 = r0
            r0 = r7
            r1 = r5
            r2 = r4
            int r2 = r2._fetchTimeoutMs     // Catch: java.lang.Throwable -> L42
            org.apache.pinot.spi.stream.MessageBatch r0 = r0.fetchMessages(r1, r2)     // Catch: java.lang.Throwable -> L42
            r8 = r0
            r0 = r8
            int r0 = r0.getMessageCount()     // Catch: java.lang.Throwable -> L42
            if (r0 != 0) goto L32
            r0 = r8
            boolean r0 = r0.isEndOfPartitionGroup()     // Catch: java.lang.Throwable -> L42
            if (r0 == 0) goto L32
            r0 = 1
            goto L33
        L32:
            r0 = 0
        L33:
            r9 = r0
            r0 = r7
            if (r0 == 0) goto L3f
            r0 = r7
            r0.close()
        L3f:
            r0 = r9
            return r0
        L42:
            r8 = move-exception
            r0 = r7
            if (r0 == 0) goto L5a
            r0 = r7
            r0.close()     // Catch: java.lang.Throwable -> L51
            goto L5a
        L51:
            r9 = move-exception
            r0 = r8
            r1 = r9
            r0.addSuppressed(r1)
        L5a:
            r0 = r8
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.pinot.plugin.stream.kinesis.KinesisStreamMetadataProvider.consumedEndOfShard(org.apache.pinot.spi.stream.StreamPartitionMsgOffset, org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus):boolean");
    }

    public Map<String, PartitionLagState> getCurrentPartitionLagState(Map<String, ConsumerPartitionState> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, ConsumerPartitionState> entry : map.entrySet()) {
            ConsumerPartitionState value = entry.getValue();
            String str = "UNKNOWN";
            RowMetadata lastProcessedRowMetadata = value.getLastProcessedRowMetadata();
            if (lastProcessedRowMetadata != null && value.getLastProcessedTimeMs() > 0) {
                str = String.valueOf(value.getLastProcessedTimeMs() - lastProcessedRowMetadata.getRecordIngestionTimeMs());
            }
            hashMap.put(entry.getKey(), new KinesisConsumerPartitionLag(str));
        }
        return hashMap;
    }

    public void close() {
    }

    public List<StreamMetadataProvider.TopicMetadata> getTopics() {
        return (List) this._kinesisConnectionHandler.getStreamNames().stream().map(str -> {
            return new KinesisTopicMetadata().setName(str);
        }).collect(Collectors.toList());
    }
}
