package kafka.server.share;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.server.FetchSession;
import kafka.server.QuotaFactory$UnboundedQuota$;
import kafka.server.ReplicaManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.jdk.javaapi.CollectionConverters;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:kafka/server/share/SharePartitionManager.class */
public class SharePartitionManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SharePartitionManager.class);
    private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
    private final ReplicaManager replicaManager;
    private final Time time;
    private final ShareSessionCache cache;
    private final ConcurrentLinkedQueue<ShareFetchPartitionData> fetchQueue;
    private final AtomicBoolean processFetchQueueLock;
    private final int recordLockDurationMs;
    private final Timer timer;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final Persister persister;
    private final ShareGroupMetrics shareGroupMetrics;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartitionManager$ShareFetchPartitionData.class */
    public static class ShareFetchPartitionData {
        private final FetchParams fetchParams;
        private final String groupId;
        private final String memberId;
        private final List<TopicIdPartition> topicIdPartitions;
        private final CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future;
        private final Map<TopicIdPartition, Integer> partitionMaxBytes;

        public ShareFetchPartitionData(FetchParams fetchParams, String str, String str2, List<TopicIdPartition> list, CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture, Map<TopicIdPartition, Integer> map) {
            this.fetchParams = fetchParams;
            this.groupId = str;
            this.memberId = str2;
            this.topicIdPartitions = list;
            this.future = completableFuture;
            this.partitionMaxBytes = map;
        }
    }

    /* loaded from: input_file:kafka/server/share/SharePartitionManager$ShareGroupMetrics.class */
    static class ShareGroupMetrics {
        public static final String METRICS_GROUP_NAME = "share-group-metrics";
        public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor";
        public static final String SHARE_ACK_RATE = "share-acknowledgement-rate";
        public static final String SHARE_ACK_COUNT = "share-acknowledgement-count";
        public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
        public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
        public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
        public static final String ACK_TYPE = "ack-type";
        public static final String PARTITION_LOAD_TIME_SENSOR = "partition-load-time-sensor";
        public static final String PARTITION_LOAD_TIME_AVG = "partition-load-time-avg";
        public static final String PARTITION_LOAD_TIME_MAX = "partition-load-time-max";
        public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap();
        private final Time time;
        private final Sensor shareAcknowledgementSensor;
        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap();
        private final Sensor partitionLoadTimeSensor;

        public ShareGroupMetrics(Metrics metrics, Time time) {
            this.time = time;
            this.shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
            this.shareAcknowledgementSensor.add(new Meter(metrics.metricName(SHARE_ACK_RATE, METRICS_GROUP_NAME, "Rate of acknowledge requests."), metrics.metricName(SHARE_ACK_COUNT, METRICS_GROUP_NAME, "The number of acknowledge requests.")));
            for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
                this.recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
                this.recordAcksSensorMap.get(entry.getKey()).add(new Meter(metrics.metricName(RECORD_ACK_RATE, METRICS_GROUP_NAME, "Rate of records acknowledged per acknowledgement type.", ACK_TYPE, entry.getValue()), metrics.metricName(RECORD_ACK_COUNT, METRICS_GROUP_NAME, "The number of records acknowledged per acknowledgement type.", ACK_TYPE, entry.getValue())));
            }
            this.partitionLoadTimeSensor = metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_AVG, METRICS_GROUP_NAME, "The average time in milliseconds to load the share partitions."), new Avg());
            this.partitionLoadTimeSensor.add(metrics.metricName(PARTITION_LOAD_TIME_MAX, METRICS_GROUP_NAME, "The maximum time in milliseconds to load the share partitions."), new Max());
        }

        void shareAcknowledgement() {
            this.shareAcknowledgementSensor.record();
        }

        void recordAcknowledgement(byte b) {
            if (this.recordAcksSensorMap.containsKey(Byte.valueOf(b))) {
                this.recordAcksSensorMap.get(Byte.valueOf(b)).record();
            }
        }

        void partitionLoadTime(long j) {
            this.partitionLoadTimeSensor.record(this.time.hiResClockMs() - j);
        }

        static {
            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/server/share/SharePartitionManager$SharePartitionKey.class */
    public static class SharePartitionKey {
        private final String groupId;
        private final TopicIdPartition topicIdPartition;

        public SharePartitionKey(String str, TopicIdPartition topicIdPartition) {
            this.groupId = (String) Objects.requireNonNull(str);
            this.topicIdPartition = (TopicIdPartition) Objects.requireNonNull(topicIdPartition);
        }

        public int hashCode() {
            return Objects.hash(this.groupId, this.topicIdPartition);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            SharePartitionKey sharePartitionKey = (SharePartitionKey) obj;
            return this.groupId.equals(sharePartitionKey.groupId) && Objects.equals(this.topicIdPartition, sharePartitionKey.topicIdPartition);
        }

        public String toString() {
            return "SharePartitionKey{groupId='" + this.groupId + ", topicIdPartition=" + this.topicIdPartition + '}';
        }
    }

    public SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, int i, int i2, int i3, Persister persister, Metrics metrics) {
        this(replicaManager, time, shareSessionCache, new ConcurrentHashMap(), i, i2, i3, persister, metrics);
    }

    private SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, Map<SharePartitionKey, SharePartition> map, int i, int i2, int i3, Persister persister, Metrics metrics) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = shareSessionCache;
        this.partitionCacheMap = map;
        this.fetchQueue = new ConcurrentLinkedQueue<>();
        this.processFetchQueueLock = new AtomicBoolean(false);
        this.recordLockDurationMs = i;
        this.timer = new SystemTimerReaper("share-group-lock-timeout-reaper", new SystemTimer("share-group-lock-timeout"));
        this.maxDeliveryCount = i2;
        this.maxInFlightMessages = i3;
        this.persister = persister;
        this.shareGroupMetrics = new ShareGroupMetrics((Metrics) Objects.requireNonNull(metrics), time);
    }

    SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache shareSessionCache, Map<SharePartitionKey, SharePartition> map, ConcurrentLinkedQueue<ShareFetchPartitionData> concurrentLinkedQueue, int i, Timer timer, int i2, int i3, Persister persister, Metrics metrics) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = shareSessionCache;
        this.partitionCacheMap = map;
        this.fetchQueue = concurrentLinkedQueue;
        this.processFetchQueueLock = new AtomicBoolean(false);
        this.recordLockDurationMs = i;
        this.timer = timer;
        this.maxDeliveryCount = i2;
        this.maxInFlightMessages = i3;
        this.persister = persister;
        this.shareGroupMetrics = new ShareGroupMetrics((Metrics) Objects.requireNonNull(metrics), time);
    }

    public CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages(String str, String str2, FetchParams fetchParams, List<TopicIdPartition> list, Map<TopicIdPartition, Integer> map) {
        log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", list, str, fetchParams);
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> completableFuture = new CompletableFuture<>();
        this.fetchQueue.add(new ShareFetchPartitionData(fetchParams, str, str2, list, completableFuture, map));
        maybeProcessFetchQueue();
        return completableFuture;
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> acknowledge(String str, String str2, Map<TopicIdPartition, List<ShareAcknowledgementBatch>> map) {
        log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", map.keySet(), str2);
        this.shareGroupMetrics.shareAcknowledgement();
        HashMap hashMap = new HashMap();
        map.forEach((topicIdPartition, list) -> {
            SharePartition sharePartition = this.partitionCacheMap.get(sharePartitionKey(str2, topicIdPartition));
            if (sharePartition != null) {
                hashMap.put(topicIdPartition, sharePartition.acknowledge(str, list).thenApply(optional -> {
                    if (optional.isPresent()) {
                        return Errors.forException((Throwable) optional.get());
                    }
                    list.forEach(shareAcknowledgementBatch -> {
                        List<Byte> acknowledgeTypes = shareAcknowledgementBatch.acknowledgeTypes();
                        ShareGroupMetrics shareGroupMetrics = this.shareGroupMetrics;
                        shareGroupMetrics.getClass();
                        acknowledgeTypes.forEach((v1) -> {
                            r1.recordAcknowledgement(v1);
                        });
                    });
                    return Errors.NONE;
                }));
            } else {
                hashMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
            }
        });
        return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().toArray(new CompletableFuture[0])).thenApply(r4 -> {
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((topicIdPartition2, completableFuture) -> {
            });
            return hashMap2;
        });
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> releaseAcquiredRecords(String str, String str2) {
        log.trace("Release acquired records request for groupId: {}, memberId: {}", str, str2);
        List<TopicIdPartition> cachedTopicIdPartitionsInShareSession = cachedTopicIdPartitionsInShareSession(str, Uuid.fromString(str2));
        if (cachedTopicIdPartitionsInShareSession.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        HashMap hashMap = new HashMap();
        cachedTopicIdPartitionsInShareSession.forEach(topicIdPartition -> {
            SharePartition sharePartition = this.partitionCacheMap.get(sharePartitionKey(str, topicIdPartition));
            if (sharePartition != null) {
                hashMap.put(topicIdPartition, sharePartition.releaseAcquiredRecords(str2).thenApply(optional -> {
                    return optional.isPresent() ? Errors.forException((Throwable) optional.get()) : Errors.NONE;
                }));
            } else {
                log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", str, topicIdPartition);
                hashMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
            }
        });
        return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().toArray(new CompletableFuture[hashMap.size()])).thenApply(r4 -> {
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((topicIdPartition2, completableFuture) -> {
            });
            return hashMap2;
        });
    }

    public ShareFetchContext newContext(String str, Map<TopicIdPartition, ShareFetchRequest.SharePartitionData> map, List<TopicIdPartition> list, ShareFetchMetadata shareFetchMetadata) {
        ShareFetchContext shareSessionContext;
        HashMap hashMap = new HashMap();
        map.forEach((topicIdPartition, sharePartitionData) -> {
            if (sharePartitionData.maxBytes > 0) {
                hashMap.put(topicIdPartition, sharePartitionData);
            }
        });
        if (shareFetchMetadata.isFull()) {
            ShareSessionKey shareSessionKey = shareSessionKey(str, shareFetchMetadata.memberId());
            if (shareFetchMetadata.epoch() != -1) {
                if (this.cache.remove(shareSessionKey) != null) {
                    log.debug("Removed share session with key {}", shareSessionKey);
                }
                ImplicitLinkedHashCollection<CachedSharePartition> implicitLinkedHashCollection = new ImplicitLinkedHashCollection<>(hashMap.size());
                hashMap.forEach((topicIdPartition2, sharePartitionData2) -> {
                    implicitLinkedHashCollection.mustAdd(new CachedSharePartition(topicIdPartition2, sharePartitionData2, false));
                });
                ShareSessionKey maybeCreateSession = this.cache.maybeCreateSession(str, shareFetchMetadata.memberId(), this.time.milliseconds(), implicitLinkedHashCollection);
                if (maybeCreateSession == null) {
                    log.error("Could not create a share session for group {} member {}", str, shareFetchMetadata.memberId());
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                shareSessionContext = new ShareSessionContext(shareFetchMetadata, hashMap);
                log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share session will be started.", maybeCreateSession, false, partitionsToLogString(hashMap.keySet()));
            } else {
                if (!hashMap.isEmpty()) {
                    throw Errors.INVALID_REQUEST.exception();
                }
                shareSessionContext = new FinalContext();
                if (this.cache.remove(shareSessionKey) != null) {
                    log.debug("Removed share session with key {}", shareSessionKey);
                }
            }
        } else {
            synchronized (this.cache) {
                ShareSessionKey shareSessionKey2 = shareSessionKey(str, shareFetchMetadata.memberId());
                ShareSession shareSession = this.cache.get(shareSessionKey2);
                if (shareSession == null) {
                    log.error("Share session error for {}: no such share session found", shareSessionKey2);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                if (shareSession.epoch != shareFetchMetadata.epoch()) {
                    log.debug("Share session error for {}: expected epoch {}, but got {} instead", shareSessionKey2, Integer.valueOf(shareSession.epoch), Integer.valueOf(shareFetchMetadata.epoch()));
                    throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                }
                Map<ShareSession.ModifiedTopicIdPartitionType, List<TopicIdPartition>> update = shareSession.update(hashMap, list);
                this.cache.touch(shareSession, this.time.milliseconds());
                shareSession.epoch = ShareFetchMetadata.nextEpoch(shareSession.epoch);
                log.debug("Created a new ShareSessionContext for session key {}, epoch {}: added {}, updated {}, removed {}", shareSession.key(), Integer.valueOf(shareSession.epoch), partitionsToLogString(update.get(ShareSession.ModifiedTopicIdPartitionType.ADDED)), partitionsToLogString(update.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)), partitionsToLogString(update.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED)));
                shareSessionContext = new ShareSessionContext(shareFetchMetadata, shareSession);
            }
        }
        return shareSessionContext;
    }

    List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String str, Uuid uuid) {
        ShareSession shareSession = this.cache.get(shareSessionKey(str, uuid));
        if (shareSession == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        shareSession.partitionMap().forEach(cachedSharePartition -> {
            arrayList.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
        });
        return arrayList;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.timer.close();
        this.persister.stop();
    }

    private ShareSessionKey shareSessionKey(String str, Uuid uuid) {
        return new ShareSessionKey(str, uuid);
    }

    private static String partitionsToLogString(Collection<TopicIdPartition> collection) {
        return FetchSession.partitionsToLogString(collection, log.isTraceEnabled());
    }

    void maybeProcessFetchQueue() {
        if (this.processFetchQueueLock.compareAndSet(false, true)) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            ShareFetchPartitionData poll = this.fetchQueue.poll();
            if (poll == null) {
                releaseProcessFetchQueueLock();
                return;
            }
            try {
                poll.topicIdPartitions.forEach(topicIdPartition -> {
                    SharePartitionKey sharePartitionKey = sharePartitionKey(poll.groupId, topicIdPartition);
                    SharePartition computeIfAbsent = this.partitionCacheMap.computeIfAbsent(sharePartitionKey, sharePartitionKey2 -> {
                        long hiResClockMs = this.time.hiResClockMs();
                        SharePartition sharePartition = new SharePartition(poll.groupId, topicIdPartition, this.maxInFlightMessages, this.maxDeliveryCount, this.recordLockDurationMs, this.timer, this.time, this.persister);
                        this.shareGroupMetrics.partitionLoadTime(hiResClockMs);
                        return sharePartition;
                    });
                    int intValue = ((Integer) poll.partitionMaxBytes.getOrDefault(topicIdPartition, 0)).intValue();
                    if (computeIfAbsent.maybeAcquireFetchLock()) {
                        if (computeIfAbsent.canAcquireRecords()) {
                            linkedHashMap.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), computeIfAbsent.nextFetchOffset(), 0L, intValue, Optional.empty()));
                        } else {
                            computeIfAbsent.releaseFetchLock();
                            log.info("Record lock partition limit exceeded for SharePartition with key {}, cannot acquire more records", sharePartitionKey);
                        }
                    }
                });
                if (!linkedHashMap.isEmpty()) {
                    log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", linkedHashMap, poll.groupId, poll.fetchParams);
                    this.replicaManager.fetchMessages(poll.fetchParams, CollectionConverters.asScala((List) linkedHashMap.entrySet().stream().map(entry -> {
                        return new Tuple2(entry.getKey(), entry.getValue());
                    }).collect(Collectors.toList())), QuotaFactory$UnboundedQuota$.MODULE$, seq -> {
                        log.trace("Data successfully retrieved by replica manager: {}", seq);
                        processFetchResponse(poll, CollectionConverters.asJava(seq)).whenComplete((map, th) -> {
                            if (th != null) {
                                log.error("Error processing fetch response for share partitions", th);
                                poll.future.completeExceptionally(th);
                            } else {
                                poll.future.complete(map);
                            }
                            releaseFetchQueueAndPartitionsLock(poll.groupId, linkedHashMap.keySet());
                        });
                        return BoxedUnit.UNIT;
                    });
                    if (!this.fetchQueue.isEmpty()) {
                        maybeProcessFetchQueue();
                    }
                    return;
                }
                poll.future.complete(Collections.emptyMap());
                releaseProcessFetchQueueLock();
                if (this.fetchQueue.isEmpty()) {
                    return;
                }
                maybeProcessFetchQueue();
            } catch (Exception e) {
                log.error("Error processing fetch queue for share partitions", (Throwable) e);
                releaseFetchQueueAndPartitionsLock(poll.groupId, linkedHashMap.keySet());
            }
        }
    }

    CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> processFetchResponse(ShareFetchPartitionData shareFetchPartitionData, List<Tuple2<TopicIdPartition, FetchPartitionData>> list) {
        HashMap hashMap = new HashMap();
        list.forEach(tuple2 -> {
            TopicIdPartition topicIdPartition = (TopicIdPartition) tuple2._1;
            FetchPartitionData fetchPartitionData = (FetchPartitionData) tuple2._2;
            SharePartition sharePartition = this.partitionCacheMap.get(sharePartitionKey(shareFetchPartitionData.groupId, topicIdPartition));
            hashMap.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId, fetchPartitionData).handle((list2, th) -> {
                log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", topicIdPartition, shareFetchPartitionData, list2);
                ShareFetchResponseData.PartitionData partitionIndex = new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition());
                if (th != null) {
                    partitionIndex.setErrorCode(Errors.forException(th).code());
                    return partitionIndex;
                }
                if (fetchPartitionData.error.code() != Errors.OFFSET_OUT_OF_RANGE.code()) {
                    partitionIndex.setPartitionIndex(topicIdPartition.partition()).setRecords(fetchPartitionData.records).setErrorCode(fetchPartitionData.error.code()).setAcquiredRecords(list2).setAcknowledgeErrorCode(Errors.NONE.code());
                    return partitionIndex;
                }
                sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition));
                partitionIndex.setPartitionIndex(topicIdPartition.partition()).setRecords(null).setErrorCode(Errors.NONE.code()).setAcquiredRecords(Collections.emptyList()).setAcknowledgeErrorCode(Errors.NONE.code());
                return partitionIndex;
            }));
        });
        return CompletableFuture.allOf((CompletableFuture[]) hashMap.values().toArray(new CompletableFuture[0])).thenApply(r4 -> {
            HashMap hashMap2 = new HashMap();
            hashMap.forEach((topicIdPartition, completableFuture) -> {
            });
            return hashMap2;
        });
    }

    void releaseFetchQueueAndPartitionsLock(String str, Set<TopicIdPartition> set) {
        set.forEach(topicIdPartition -> {
            this.partitionCacheMap.get(sharePartitionKey(str, topicIdPartition)).releaseFetchLock();
        });
        releaseProcessFetchQueueLock();
    }

    private void releaseProcessFetchQueueLock() {
        this.processFetchQueueLock.set(false);
    }

    private SharePartitionKey sharePartitionKey(String str, TopicIdPartition topicIdPartition) {
        return new SharePartitionKey(str, topicIdPartition);
    }

    long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) {
        Option<FileRecords.TimestampAndOffset> fetchOffsetForTimestamp = this.replicaManager.fetchOffsetForTimestamp(topicIdPartition.topicPartition(), -2L, Option.empty(), Optional.empty(), true);
        if (fetchOffsetForTimestamp.isEmpty()) {
            return 0L;
        }
        return fetchOffsetForTimestamp.get().offset;
    }
}
