package org.apache.kafka.server;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/server/AssignmentsManager.class */
public final class AssignmentsManager {
    static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(TimeUnit.MILLISECONDS.toNanos(100), 2, TimeUnit.SECONDS.toNanos(10), 0.02d);
    static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
    static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC = metricName("QueuedReplicaToDirAssignments");
    static final String MAYBE_SEND_ASSIGNMENTS_EVENT = "MaybeSendAssignmentsEvent";
    private final Logger log;
    private final ExponentialBackoff backoff;
    private final Time time;
    private final NodeToControllerChannelManager channelManager;
    private final int nodeId;
    private final Supplier<MetadataImage> metadataImageSupplier;
    private final Function<Uuid, String> directoryIdToDescription;
    private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
    private volatile Map<TopicIdPartition, Assignment> inflight;
    private final MetricsRegistry metricsRegistry;
    private int previousGlobalFailures;
    private final KafkaEventQueue eventQueue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$CompletionHandler.class */
    public class CompletionHandler implements ControllerRequestCompletionHandler {
        private final Map<TopicIdPartition, Assignment> sent;

        CompletionHandler(Map<TopicIdPartition, Assignment> map) {
            this.sent = map;
        }

        @Override // org.apache.kafka.server.ControllerRequestCompletionHandler
        public void onTimeout() {
            AssignmentsManager.this.eventQueue.append(new HandleResponseEvent(this.sent, Optional.empty()));
        }

        @Override // org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            AssignmentsManager.this.eventQueue.append(new HandleResponseEvent(this.sent, Optional.of(clientResponse)));
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$HandleResponseEvent.class */
    private class HandleResponseEvent implements EventQueue.Event {
        private final Map<TopicIdPartition, Assignment> sent;
        private final Optional<ClientResponse> response;

        HandleResponseEvent(Map<TopicIdPartition, Assignment> map, Optional<ClientResponse> optional) {
            this.sent = map;
            this.response = optional;
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() {
            try {
                try {
                    AssignmentsManager.this.handleResponse(this.sent, this.response);
                    if (!AssignmentsManager.this.ready.isEmpty()) {
                        AssignmentsManager.this.rescheduleMaybeSendAssignmentsEvent(AssignmentsManager.this.time.nanoseconds());
                    }
                } catch (Exception e) {
                    AssignmentsManager.this.log.error("Unexpected exception in HandleResponseEvent", (Throwable) e);
                    if (!AssignmentsManager.this.ready.isEmpty()) {
                        AssignmentsManager.this.rescheduleMaybeSendAssignmentsEvent(AssignmentsManager.this.time.nanoseconds());
                    }
                }
            } catch (Throwable th) {
                if (!AssignmentsManager.this.ready.isEmpty()) {
                    AssignmentsManager.this.rescheduleMaybeSendAssignmentsEvent(AssignmentsManager.this.time.nanoseconds());
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$MaybeSendAssignmentsEvent.class */
    public class MaybeSendAssignmentsEvent implements EventQueue.Event {
        private MaybeSendAssignmentsEvent() {
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() {
            try {
                AssignmentsManager.this.maybeSendAssignments();
            } catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception in MaybeSendAssignmentsEvent", (Throwable) e);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/server/AssignmentsManager$ShutdownEvent.class */
    private class ShutdownEvent implements EventQueue.Event {
        private ShutdownEvent() {
        }

        @Override // org.apache.kafka.queue.EventQueue.Event
        public void run() {
            AssignmentsManager.this.log.info("shutting down.");
            try {
                AssignmentsManager.this.channelManager.shutdown();
            } catch (Exception e) {
                AssignmentsManager.this.log.error("Unexpected exception shutting down NodeToControllerChannelManager", (Throwable) e);
            }
            try {
                AssignmentsManager.this.metricsRegistry.removeMetric(AssignmentsManager.QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
            } catch (Exception e2) {
                AssignmentsManager.this.log.error("Unexpected exception removing metrics.", (Throwable) e2);
            }
        }
    }

    static MetricName metricName(String str) {
        return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", str);
    }

    public AssignmentsManager(Time time, NodeToControllerChannelManager nodeToControllerChannelManager, int i, Supplier<MetadataImage> supplier, Function<Uuid, String> function) {
        this(STANDARD_BACKOFF, time, nodeToControllerChannelManager, i, supplier, function, KafkaYammerMetrics.defaultRegistry());
    }

    AssignmentsManager(ExponentialBackoff exponentialBackoff, Time time, NodeToControllerChannelManager nodeToControllerChannelManager, int i, Supplier<MetadataImage> supplier, Function<Uuid, String> function, MetricsRegistry metricsRegistry) {
        this.log = new LogContext("[AssignmentsManager id=" + i + "] ").logger(AssignmentsManager.class);
        this.backoff = exponentialBackoff;
        this.time = time;
        this.channelManager = nodeToControllerChannelManager;
        this.nodeId = i;
        this.directoryIdToDescription = function;
        this.metadataImageSupplier = supplier;
        this.ready = new ConcurrentHashMap<>();
        this.inflight = Collections.emptyMap();
        this.metricsRegistry = metricsRegistry;
        this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() { // from class: org.apache.kafka.server.AssignmentsManager.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.yammer.metrics.core.Gauge
            /* renamed from: value */
            public Integer mo937value() {
                return Integer.valueOf(AssignmentsManager.this.numPending());
            }
        });
        this.previousGlobalFailures = 0;
        this.eventQueue = new KafkaEventQueue(time, new LogContext("[AssignmentsManager id=" + i + "]"), "broker-" + i + "-directory-assignments-manager-", new ShutdownEvent());
        nodeToControllerChannelManager.start();
    }

    public int numPending() {
        return this.ready.size() + this.inflight.size();
    }

    public void close() throws InterruptedException {
        this.eventQueue.close();
    }

    public void onAssignment(TopicIdPartition topicIdPartition, Uuid uuid, String str, Runnable runnable) {
        long nanoseconds = this.time.nanoseconds();
        Assignment assignment = new Assignment(topicIdPartition, uuid, nanoseconds, runnable);
        this.ready.put(topicIdPartition, assignment);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Registered assignment {}: {}, moving {}-{} into {}", assignment, str, (String) Optional.ofNullable(this.metadataImageSupplier.get().topics().getTopic(assignment.topicIdPartition().topicId())).map((v0) -> {
                return v0.name();
            }).orElse(assignment.topicIdPartition().topicId().toString()), Integer.valueOf(topicIdPartition.partitionId()), this.directoryIdToDescription.apply(assignment.directoryId()));
        }
        rescheduleMaybeSendAssignmentsEvent(nanoseconds);
    }

    void rescheduleMaybeSendAssignmentsEvent(long j) {
        this.eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT, new AssignmentsManagerDeadlineFunction(this.backoff, j, this.previousGlobalFailures, !this.inflight.isEmpty(), this.ready.size()), new MaybeSendAssignmentsEvent());
    }

    void maybeSendAssignments() {
        int size = this.inflight.size();
        if (this.log.isTraceEnabled()) {
            this.log.trace("maybeSendAssignments: inflightSize = {}.", Integer.valueOf(size));
        }
        if (size > 0) {
            this.log.trace("maybeSendAssignments: cannot send new assignments because there are {} still in flight.", Integer.valueOf(size));
            return;
        }
        MetadataImage metadataImage = this.metadataImageSupplier.get();
        HashMap hashMap = new HashMap();
        int i = 0;
        Iterator<Assignment> it = this.ready.values().iterator();
        while (it.hasNext() && hashMap.size() < 2250) {
            Assignment next = it.next();
            it.remove();
            if (next.valid(this.nodeId, metadataImage)) {
                hashMap.put(next.topicIdPartition(), next);
            } else {
                i++;
            }
        }
        this.log.info("maybeSendAssignments: sending {} assignments; invalidated {} assignments prior to sending.", Integer.valueOf(hashMap.size()), Integer.valueOf(i));
        if (hashMap.isEmpty()) {
            return;
        }
        sendAssignments(metadataImage.cluster().brokerEpoch(this.nodeId), hashMap);
    }

    void sendAssignments(long j, Map<TopicIdPartition, Assignment> map) {
        this.channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(buildRequestData(this.nodeId, j, map)), new CompletionHandler(map));
        this.inflight = map;
    }

    void handleResponse(Map<TopicIdPartition, Assignment> map, Optional<ClientResponse> optional) {
        this.inflight = Collections.emptyMap();
        Optional<String> globalResponseError = globalResponseError(optional);
        if (globalResponseError.isPresent()) {
            this.previousGlobalFailures++;
            this.log.error("handleResponse: {} assignments failed; global error: {}. Retrying.", Integer.valueOf(map.size()), globalResponseError.get());
            map.entrySet().forEach(entry -> {
            });
            return;
        }
        this.previousGlobalFailures = 0;
        AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) optional.get().responseBody()).data();
        long nanoseconds = this.time.nanoseconds();
        Iterator<AssignReplicasToDirsResponseData.DirectoryData> it = data.directories().iterator();
        while (it.hasNext()) {
            for (AssignReplicasToDirsResponseData.TopicData topicData : it.next().topics()) {
                for (AssignReplicasToDirsResponseData.PartitionData partitionData : topicData.partitions()) {
                    TopicIdPartition topicIdPartition = new TopicIdPartition(topicData.topicId(), partitionData.partitionIndex());
                    handleAssignmentResponse(topicIdPartition, map, Errors.forCode(partitionData.errorCode()), nanoseconds);
                    map.remove(topicIdPartition);
                }
            }
        }
        for (Assignment assignment : map.values()) {
            this.ready.putIfAbsent(assignment.topicIdPartition(), assignment);
            this.log.error("handleResponse: no result in response for partition {}.", assignment.topicIdPartition());
        }
    }

    void handleAssignmentResponse(TopicIdPartition topicIdPartition, Map<TopicIdPartition, Assignment> map, Errors errors, long j) {
        Assignment assignment = map.get(topicIdPartition);
        if (assignment == null) {
            this.log.error("handleResponse: response contained topicIdPartition {}, but this was not in the request.", topicIdPartition);
            return;
        }
        if (errors.equals(Errors.NONE)) {
            try {
                assignment.successCallback().run();
                return;
            } catch (Exception e) {
                this.log.error("handleResponse: unexpected callback exception", (Throwable) e);
                return;
            }
        }
        this.ready.putIfAbsent(topicIdPartition, assignment);
        if (this.log.isDebugEnabled() || j > assignment.submissionTimeNs() + MIN_NOISY_FAILURE_INTERVAL_NS) {
            this.log.error("handleResponse: error assigning {}: {}.", assignment.topicIdPartition(), errors);
        }
    }

    int previousGlobalFailures() throws ExecutionException, InterruptedException {
        CompletableFuture completableFuture = new CompletableFuture();
        this.eventQueue.append(() -> {
            completableFuture.complete(Integer.valueOf(this.previousGlobalFailures));
        });
        return ((Integer) completableFuture.get()).intValue();
    }

    int numInFlight() {
        return this.inflight.size();
    }

    static Optional<String> globalResponseError(Optional<ClientResponse> optional) {
        if (!optional.isPresent()) {
            return Optional.of("Timeout");
        }
        if (optional.get().authenticationException() != null) {
            return Optional.of("AuthenticationException");
        }
        if (optional.get().wasTimedOut()) {
            return Optional.of("Disonnected[Timeout]");
        }
        if (optional.get().wasDisconnected()) {
            return Optional.of("Disconnected");
        }
        if (optional.get().versionMismatch() != null) {
            return Optional.of("UnsupportedVersionException");
        }
        if (optional.get().responseBody() == null) {
            return Optional.of("EmptyResponse");
        }
        if (!(optional.get().responseBody() instanceof AssignReplicasToDirsResponse)) {
            return Optional.of("ClassCastException");
        }
        Errors forCode = Errors.forCode(((AssignReplicasToDirsResponse) optional.get().responseBody()).data().errorCode());
        return forCode != Errors.NONE ? Optional.of("Response-level error: " + forCode.name()) : Optional.empty();
    }

    static AssignReplicasToDirsRequestData buildRequestData(int i, long j, Map<TopicIdPartition, Assignment> map) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<TopicIdPartition, Assignment> entry : map.entrySet()) {
            TopicIdPartition key = entry.getKey();
            Uuid directoryId = entry.getValue().directoryId();
            AssignReplicasToDirsRequestData.DirectoryData directoryData = (AssignReplicasToDirsRequestData.DirectoryData) hashMap.computeIfAbsent(directoryId, uuid -> {
                return new AssignReplicasToDirsRequestData.DirectoryData().setId(directoryId);
            });
            AssignReplicasToDirsRequestData.TopicData topicData = (AssignReplicasToDirsRequestData.TopicData) ((Map) hashMap2.computeIfAbsent(directoryId, uuid2 -> {
                return new HashMap();
            })).computeIfAbsent(key.topicId(), uuid3 -> {
                AssignReplicasToDirsRequestData.TopicData topicId = new AssignReplicasToDirsRequestData.TopicData().setTopicId(uuid3);
                directoryData.topics().add(topicId);
                return topicId;
            });
            topicData.partitions().add(new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(key.partitionId()));
        }
        return new AssignReplicasToDirsRequestData().setBrokerId(i).setBrokerEpoch(j).setDirectories(new ArrayList(hashMap.values()));
    }
}
