package org.apache.pinot.plugin.stream.kinesis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.class */
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KinesisConsumer.class);
    private final String _streamTopicName;
    private final int _numMaxRecordsToFetch;
    private final ExecutorService _executorService;
    private final ShardIteratorType _shardIteratorType;

    public KinesisConsumer(KinesisConfig kinesisConfig) {
        super(kinesisConfig);
        this._streamTopicName = kinesisConfig.getStreamTopicName();
        this._numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
        this._shardIteratorType = kinesisConfig.getShardIteratorType();
        this._executorService = Executors.newSingleThreadExecutor();
    }

    @VisibleForTesting
    public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) {
        super(kinesisConfig, kinesisClient);
        this._kinesisClient = kinesisClient;
        this._streamTopicName = kinesisConfig.getStreamTopicName();
        this._numMaxRecordsToFetch = kinesisConfig.getNumMaxRecordsToFetch();
        this._shardIteratorType = kinesisConfig.getShardIteratorType();
        this._executorService = Executors.newSingleThreadExecutor();
    }

    @Override // org.apache.pinot.spi.stream.PartitionGroupConsumer
    public KinesisRecordsBatch fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2, int i) {
        ArrayList arrayList = new ArrayList();
        Future submit = this._executorService.submit(() -> {
            return getResult(streamPartitionMsgOffset, streamPartitionMsgOffset2, arrayList);
        });
        try {
            return (KinesisRecordsBatch) submit.get(i, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            submit.cancel(true);
            return handleException((KinesisPartitionGroupOffset) streamPartitionMsgOffset, arrayList);
        } catch (Exception e2) {
            return handleException((KinesisPartitionGroupOffset) streamPartitionMsgOffset, arrayList);
        }
    }

    private KinesisRecordsBatch getResult(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2, List<Record> list) {
        KinesisPartitionGroupOffset kinesisPartitionGroupOffset = (KinesisPartitionGroupOffset) streamPartitionMsgOffset;
        try {
            if (this._kinesisClient == null) {
                createConnection();
            }
            Map<String, String> shardToStartSequenceMap = kinesisPartitionGroupOffset.getShardToStartSequenceMap();
            Preconditions.checkState(shardToStartSequenceMap.size() == 1, "Only 1 shard per consumer supported. Found: %s, in startShardToSequenceMap", shardToStartSequenceMap.keySet());
            Map.Entry<String, String> next = shardToStartSequenceMap.entrySet().iterator().next();
            String shardIterator = getShardIterator(next.getKey(), next.getValue());
            String str = null;
            if (streamPartitionMsgOffset2 != null) {
                Map<String, String> shardToStartSequenceMap2 = ((KinesisPartitionGroupOffset) streamPartitionMsgOffset2).getShardToStartSequenceMap();
                Preconditions.checkState(shardToStartSequenceMap2.size() == 1, "Only 1 shard per consumer supported. Found: %s, in endShardToSequenceMap", shardToStartSequenceMap2.keySet());
                str = shardToStartSequenceMap2.values().iterator().next();
            }
            boolean z = false;
            while (true) {
                if (shardIterator == null) {
                    break;
                }
                GetRecordsResponse records = this._kinesisClient.getRecords((GetRecordsRequest) GetRecordsRequest.builder().shardIterator(shardIterator).mo17416build());
                if (!records.records().isEmpty()) {
                    list.addAll(records.records());
                    String sequenceNumber = list.get(list.size() - 1).sequenceNumber();
                    if (str != null) {
                        if (str.compareTo(sequenceNumber) <= 0) {
                            break;
                        }
                    }
                    if (list.size() >= this._numMaxRecordsToFetch) {
                        break;
                    }
                }
                if (records.hasChildShards() && !records.childShards().isEmpty()) {
                    z = true;
                    break;
                }
                shardIterator = records.nextShardIterator();
                if (Thread.interrupted()) {
                    break;
                }
            }
            return new KinesisRecordsBatch(list, next.getKey(), z);
        } catch (IllegalStateException e) {
            debugOrLogWarning("Illegal state exception, connection is broken", e);
            return handleException(kinesisPartitionGroupOffset, list);
        } catch (AbortedException e2) {
            debugOrLogWarning("Task aborted due to exception", e2);
            return handleException(kinesisPartitionGroupOffset, list);
        } catch (ExpiredIteratorException e3) {
            debugOrLogWarning("ShardIterator expired while trying to fetch records", e3);
            return handleException(kinesisPartitionGroupOffset, list);
        } catch (InvalidArgumentException | ResourceNotFoundException e4) {
            LOGGER.error("Encountered AWS error while attempting to fetch records", e4);
            return handleException(kinesisPartitionGroupOffset, list);
        } catch (ProvisionedThroughputExceededException e5) {
            debugOrLogWarning("The request rate for the stream is too high", e5);
            return handleException(kinesisPartitionGroupOffset, list);
        } catch (KinesisException e6) {
            debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e6);
            throw new RuntimeException(e6);
        } catch (Throwable th) {
            LOGGER.error("Unknown fetchRecords exception", th);
            throw new RuntimeException(th);
        }
    }

    private void debugOrLogWarning(String str, Throwable th) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(str, th);
        } else {
            LOGGER.warn(str + ": " + th.getMessage());
        }
    }

    private KinesisRecordsBatch handleException(KinesisPartitionGroupOffset kinesisPartitionGroupOffset, List<Record> list) {
        String key = kinesisPartitionGroupOffset.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
        if (!list.isEmpty()) {
            String sequenceNumber = list.get(list.size() - 1).sequenceNumber();
            HashMap hashMap = new HashMap(kinesisPartitionGroupOffset.getShardToStartSequenceMap());
            hashMap.put((String) hashMap.keySet().iterator().next(), sequenceNumber);
        }
        return new KinesisRecordsBatch(list, key, false);
    }

    private String getShardIterator(String str, String str2) {
        GetShardIteratorRequest.Builder shardId = GetShardIteratorRequest.builder().streamName(this._streamTopicName).shardId(str);
        return this._kinesisClient.getShardIterator((GetShardIteratorRequest) (str2 != null ? shardId.startingSequenceNumber(str2).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) : shardId.shardIteratorType(this._shardIteratorType)).mo17416build()).shardIterator();
    }

    @Override // org.apache.pinot.plugin.stream.kinesis.KinesisConnectionHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        shutdownAndAwaitTermination();
    }

    void shutdownAndAwaitTermination() {
        this._executorService.shutdown();
        try {
            if (!this._executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                this._executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            this._executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
