package org.apache.pinot.plugin.stream.kinesis;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.class */
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
    private static final Logger LOGGER;
    private String _nextStartSequenceNumber;
    private String _nextShardIterator;
    private int _currentSecond;
    private int _numRequestsInCurrentSecond;
    static final /* synthetic */ boolean $assertionsDisabled;

    public KinesisConsumer(KinesisConfig kinesisConfig) {
        super(kinesisConfig);
        this._nextStartSequenceNumber = null;
        this._nextShardIterator = null;
        this._currentSecond = 0;
        this._numRequestsInCurrentSecond = 0;
        LOGGER.info("Created Kinesis consumer with topic: {}, RPS limit: {}, max records per fetch: {}", kinesisConfig.getStreamTopicName(), Integer.valueOf(kinesisConfig.getRpsLimit()), Integer.valueOf(kinesisConfig.getNumMaxRecordsToFetch()));
    }

    @VisibleForTesting
    public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) {
        super(kinesisConfig, kinesisClient);
        this._nextStartSequenceNumber = null;
        this._nextShardIterator = null;
        this._currentSecond = 0;
        this._numRequestsInCurrentSecond = 0;
    }

    /* renamed from: fetchMessages, reason: merged with bridge method [inline-methods] */
    public synchronized KinesisMessageBatch m978fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, int i) {
        try {
            return getKinesisMessageBatch((KinesisPartitionGroupOffset) streamPartitionMsgOffset);
        } catch (ProvisionedThroughputExceededException e) {
            LOGGER.error("Rate limit exceeded while fetching messages from Kinesis stream: {} with threshold: {}", e.getMessage(), Integer.valueOf(this._config.getRpsLimit()));
            return new KinesisMessageBatch(List.of(), (KinesisPartitionGroupOffset) streamPartitionMsgOffset, false);
        }
    }

    private KinesisMessageBatch getKinesisMessageBatch(KinesisPartitionGroupOffset kinesisPartitionGroupOffset) {
        List of;
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset2;
        String shardId = kinesisPartitionGroupOffset.getShardId();
        String sequenceNumber = kinesisPartitionGroupOffset.getSequenceNumber();
        String shardIterator = sequenceNumber.equals(this._nextStartSequenceNumber) ? this._nextShardIterator : this._kinesisClient.getShardIterator((GetShardIteratorRequest) GetShardIteratorRequest.builder().streamName(this._config.getStreamTopicName()).shardId(shardId).startingSequenceNumber(sequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER).mo1169build()).shardIterator();
        if (shardIterator == null) {
            return new KinesisMessageBatch(List.of(), kinesisPartitionGroupOffset, true);
        }
        rateLimitRequests();
        GetRecordsResponse records = this._kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(shardIterator).limit(Integer.valueOf(this._config.getNumMaxRecordsToFetch())).mo1169build());
        List<Record> records2 = records.records();
        if (records2.isEmpty()) {
            of = List.of();
            kinesisPartitionGroupOffset2 = kinesisPartitionGroupOffset;
        } else {
            of = (List) records2.stream().map(record -> {
                return extractStreamMessage(record, shardId);
            }).collect(Collectors.toList());
            StreamMessageMetadata metadata = ((BytesStreamMessage) of.get(of.size() - 1)).getMetadata();
            if (!$assertionsDisabled && metadata == null) {
                throw new AssertionError();
            }
            kinesisPartitionGroupOffset2 = (KinesisPartitionGroupOffset) metadata.getNextOffset();
        }
        if (!$assertionsDisabled && kinesisPartitionGroupOffset2 == null) {
            throw new AssertionError();
        }
        this._nextStartSequenceNumber = kinesisPartitionGroupOffset2.getSequenceNumber();
        this._nextShardIterator = records.nextShardIterator();
        return new KinesisMessageBatch(of, kinesisPartitionGroupOffset2, this._nextShardIterator == null);
    }

    private void rateLimitRequests() {
        long currentTimeMillis = System.currentTimeMillis();
        int seconds = (int) TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis);
        if (seconds != this._currentSecond) {
            this._currentSecond = seconds;
            this._numRequestsInCurrentSecond = 1;
        } else {
            if (this._numRequestsInCurrentSecond != this._config.getRpsLimit()) {
                this._numRequestsInCurrentSecond++;
                return;
            }
            try {
                Thread.sleep(1000 - (currentTimeMillis % 1000));
                this._currentSecond = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
                this._numRequestsInCurrentSecond = 1;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private BytesStreamMessage extractStreamMessage(Record record, String str) {
        byte[] bytes = record.partitionKey().getBytes(StandardCharsets.UTF_8);
        byte[] asByteArray = record.data().asByteArray();
        long epochMilli = record.approximateArrivalTimestamp().toEpochMilli();
        String sequenceNumber = record.sequenceNumber();
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset(str, sequenceNumber);
        StreamMessageMetadata.Builder offset = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(epochMilli).setSerializedValueSize(asByteArray.length).setOffset(kinesisPartitionGroupOffset, kinesisPartitionGroupOffset);
        if (this._config.isPopulateMetadata()) {
            offset.setMetadata(Map.of(KinesisStreamMessageMetadata.APPRX_ARRIVAL_TIMESTAMP_KEY, String.valueOf(epochMilli), KinesisStreamMessageMetadata.SEQUENCE_NUMBER_KEY, sequenceNumber));
        }
        return new BytesStreamMessage(bytes, asByteArray, offset.build());
    }

    @Override // org.apache.pinot.plugin.stream.kinesis.KinesisConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
    }

    static {
        $assertionsDisabled = !KinesisConsumer.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger((Class<?>) KinesisConsumer.class);
    }
}
