package org.apache.helix.common.caches;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.pinot.$internal.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.$internal.com.google.common.collect.Lists;
import org.apache.pinot.$internal.com.google.common.collect.Maps;
import org.apache.pinot.$internal.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/common/caches/InstanceMessagesCache.class */
public class InstanceMessagesCache {
    private Map<String, Map<String, Message>> _messageMap;
    private Map<String, Map<String, Message>> _relayMessageMap;
    public static final String RELAY_MESSAGE_LIFETIME = "helix.controller.messagecache.relaymessagelifetime";
    private String _clusterName;
    private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName());
    private static final long DEFAULT_RELAY_MESSAGE_LIFETIME = TimeUnit.MINUTES.toMillis(60);
    private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
    private Map<String, Map<String, Message>> _staleMessageCache = Maps.newHashMap();
    private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap();
    private Map<String, Message> _relayHostMessageCache = Maps.newHashMap();
    private final long _relayMessageLifetime = HelixUtil.getSystemPropertyAsLong(RELAY_MESSAGE_LIFETIME, DEFAULT_RELAY_MESSAGE_LIFETIME);

    public InstanceMessagesCache(String str) {
        this._clusterName = str;
    }

    public boolean refresh(HelixDataAccessor helixDataAccessor, Map<String, LiveInstance> map) {
        LOG.info("START: InstanceMessagesCache.refresh()");
        long currentTimeMillis = System.currentTimeMillis();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        HashMap hashMap = new HashMap();
        LinkedList newLinkedList = Lists.newLinkedList();
        long j = 0;
        for (String str : map.keySet()) {
            Map<String, Message> map2 = this._messageCache.get(str);
            if (map2 == null) {
                map2 = Maps.newHashMap();
                this._messageCache.put(str, map2);
            }
            hashMap.put(str, map2);
            HashSet<String> newHashSet = Sets.newHashSet(helixDataAccessor.getChildNames(keyBuilder.messages(str)));
            long currentTimeMillis2 = System.currentTimeMillis();
            Iterator<String> it2 = map2.keySet().iterator();
            while (it2.hasNext()) {
                if (!newHashSet.contains(it2.next())) {
                    it2.remove();
                }
            }
            j += System.currentTimeMillis() - currentTimeMillis2;
            for (String str2 : newHashSet) {
                if (!map2.containsKey(str2)) {
                    newLinkedList.add(keyBuilder.message(str, str2));
                }
            }
        }
        if (newLinkedList.size() > 0) {
            for (Message message : helixDataAccessor.getProperty(newLinkedList, true)) {
                if (message != null) {
                    this._messageCache.get(message.getTgtName()).put(message.getId(), message);
                }
            }
        }
        this._messageMap = Collections.unmodifiableMap(hashMap);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Message purge took: {} ", Long.valueOf(j));
        }
        LOG.info("END: InstanceMessagesCache.refresh(), {} of Messages read from ZooKeeper. took {} ms. ", Integer.valueOf(newLinkedList.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        refreshStaleMessageCache();
        return true;
    }

    @VisibleForTesting
    public Map<String, Map<String, Message>> getStaleMessageCache() {
        return this._staleMessageCache;
    }

    public Set<Message> getStaleMessagesByInstance(String str) {
        Map<String, Message> map = this._staleMessageCache.get(str);
        return map != null ? new HashSet(map.values()) : Collections.emptySet();
    }

    public void addStaleMessage(String str, Message message) {
        this._staleMessageCache.putIfAbsent(str, new HashMap());
        this._staleMessageCache.get(str).putIfAbsent(message.getMsgId(), message);
    }

    public void updateRelayMessages(Map<String, LiveInstance> map, Map<String, Map<String, Map<String, CurrentState>>> map2) {
        Iterator<String> it2 = this._messageMap.keySet().iterator();
        while (it2.hasNext()) {
            for (Message message : this._messageMap.get(it2.next()).values()) {
                if (message.hasRelayMessages()) {
                    Iterator<Message> it3 = message.getRelayMessages().values().iterator();
                    while (it3.hasNext()) {
                        cacheRelayMessage(it3.next(), message);
                    }
                }
            }
        }
        long j = Long.MAX_VALUE;
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (String str : this._relayMessageCache.keySet()) {
            Map<String, Message> map3 = this._relayMessageCache.get(str);
            Iterator<Map.Entry<String, Message>> it4 = map3.entrySet().iterator();
            while (it4.hasNext()) {
                Message value = it4.next().getValue();
                Map<String, Message> map4 = this._messageMap.get(str);
                if (value.isValid()) {
                    if (map4 != null && map4.containsKey(value.getMsgId())) {
                        if (map4.get(value.getMsgId()).isRelayMessage()) {
                            LOG.info("Relay message already committed, remove relay message {} from the cache.", value.getId());
                            it4.remove();
                            this._relayHostMessageCache.remove(value.getMsgId());
                        } else {
                            LOG.info("Controller already sent the message to the target host, set relay message {} to be expired.", value.getId());
                            setMessageRelayTime(value, currentTimeMillis);
                        }
                    }
                    try {
                        checkTargetHost(str, value, map, map2);
                        checkRelayHost(value, map, map2, this._relayHostMessageCache.get(value.getMsgId()));
                    } catch (Exception e) {
                        LOG.warn("Failed to check target and relay host and set the relay time. Relay message: {} exception: {}", value.getId(), e);
                    }
                    if (value.isExpired()) {
                        LOG.info("relay message {} expired, remove it from cache. relay time {}.", value.getId(), Long.valueOf(value.getRelayTime()));
                        it4.remove();
                        this._relayHostMessageCache.remove(value.getMsgId());
                    } else if (value.getRelayTime() >= 0 || value.getCreateTimeStamp() + this._relayMessageLifetime >= System.currentTimeMillis()) {
                        if (!hashMap.containsKey(str)) {
                            hashMap.put(str, Maps.newHashMap());
                        }
                        ((Map) hashMap.get(str)).put(value.getMsgId(), value);
                        long createTimeStamp = value.getCreateTimeStamp() + this._relayMessageLifetime;
                        if (value.getRelayTime() > 0) {
                            createTimeStamp = value.getRelayTime() + value.getExpiryPeriod();
                        }
                        if (createTimeStamp < j) {
                            j = createTimeStamp;
                        }
                    } else {
                        LOG.info("relay message {} has reached its lifetime, remove it from cache.", value.getId());
                        it4.remove();
                        this._relayHostMessageCache.remove(value.getMsgId());
                    }
                } else {
                    LOG.warn("Invalid relay message {}, remove it from the cache.", value.getId());
                    it4.remove();
                    this._relayHostMessageCache.remove(value.getMsgId());
                }
            }
            if (map3.isEmpty()) {
                hashSet.add(str);
            }
        }
        this._relayMessageCache.keySet().removeAll(hashSet);
        if (j < Long.MAX_VALUE) {
            scheduleFuturePipeline(j);
        }
        this._relayMessageMap = Collections.unmodifiableMap(hashMap);
        long j2 = 0;
        for (String str2 : this._relayMessageMap.keySet()) {
            Map<String, Message> map5 = this._relayMessageMap.get(str2);
            if (!this._messageMap.containsKey(str2)) {
                this._messageMap.put(str2, Maps.newHashMap());
            }
            this._messageMap.get(str2).putAll(map5);
            j2 += map5.size();
        }
        LOG.info("END: updateRelayMessages(), {} of valid relay messages in cache, took {} ms. ", Long.valueOf(j2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void checkTargetHost(String str, Message message, Map<String, LiveInstance> map, Map<String, Map<String, Map<String, CurrentState>>> map2) {
        long currentTimeMillis = System.currentTimeMillis();
        String resourceName = message.getResourceName();
        String partitionName = message.getPartitionName();
        String tgtSessionId = message.getTgtSessionId();
        if (!map.containsKey(str)) {
            LOG.info("Target host is not alive anymore, expiring relay message {} immediately.", message.getId());
            message.setExpired(true);
            return;
        }
        if (!map.get(str).getEphemeralOwner().equals(tgtSessionId)) {
            LOG.info("Instance SessionId does not match, expiring relay message {} immediately.", message.getId());
            message.setExpired(true);
            return;
        }
        Map<String, Map<String, CurrentState>> map3 = map2.get(str);
        if (map3 == null || !map3.containsKey(tgtSessionId)) {
            LOG.warn("CurrentStateMap null for {}, session {}, pending relay message {}", str, tgtSessionId, message.getId());
            return;
        }
        CurrentState currentState = map3.get(tgtSessionId).get(resourceName);
        if (currentState == null) {
            setMessageRelayTime(message, currentTimeMillis);
            LOG.warn("CurrentState is null for {} on {}, set relay time {} for message {}", resourceName, str, Long.valueOf(message.getRelayTime()), message.getId());
            return;
        }
        String state = currentState.getState(partitionName);
        String toState = message.getToState();
        String fromState = message.getFromState();
        if (toState.equals(state) || !fromState.equals(state)) {
            setMessageRelayTime(message, currentTimeMillis);
            LOG.debug("{}'s currentState {} on {} has changed, set relay message {} to be expired.", partitionName, state, str, message.getId());
        }
    }

    private void checkRelayHost(Message message, Map<String, LiveInstance> map, Map<String, Map<String, Map<String, CurrentState>>> map2, Message message2) {
        long currentTimeMillis = System.currentTimeMillis();
        String tgtSessionId = message2.getTgtSessionId();
        String tgtName = message2.getTgtName();
        String resourceName = message2.getResourceName();
        String partitionName = message2.getPartitionName();
        if (!map.containsKey(tgtName)) {
            setMessageRelayTime(message, currentTimeMillis);
            return;
        }
        String ephemeralOwner = map.get(tgtName).getEphemeralOwner();
        if (!ephemeralOwner.equals(tgtSessionId)) {
            LOG.info("Relay instance sessionId {} does not match sessionId {} in hosted message {}, set relay message {} to be expired.", ephemeralOwner, tgtSessionId, message.getId(), message2.getMsgId());
            setMessageRelayTime(message, currentTimeMillis);
            return;
        }
        Map<String, Map<String, CurrentState>> map3 = map2.get(tgtName);
        if (map3 == null || !map3.containsKey(tgtSessionId)) {
            LOG.warn("CurrentStateMap null for {}, session {}, set relay messages {} to be expired. Hosted message {}.", tgtName, tgtSessionId, message.getId(), message2.getId());
            setMessageRelayTime(message, currentTimeMillis);
            return;
        }
        CurrentState currentState = map3.get(tgtSessionId).get(resourceName);
        if (currentState == null) {
            LOG.info("No currentState found for {} on {}, set relay message {} to be expired.", resourceName, tgtName, message.getId());
            setMessageRelayTime(message, currentTimeMillis);
            return;
        }
        String state = currentState.getState(partitionName);
        String toState = message2.getToState();
        String fromState = message2.getFromState();
        if (fromState.equals(state)) {
            return;
        }
        if (HelixDefinedState.ERROR.name().equals(state) && fromState.equals(currentState.getPreviousState(partitionName))) {
            LOG.info("Partition {} got to ERROR from the top state, expiring relay message {} immediately. Hosted message {}.", partitionName, message.getId(), message2.getId());
            message.setExpired(true);
            return;
        }
        if (toState.equals(state) && fromState.equals(currentState.getPreviousState(partitionName))) {
            long endTime = currentState.getEndTime(partitionName);
            if (endTime > message.getCreateTimeStamp()) {
                setMessageRelayTime(message, endTime);
                LOG.error("Target state for partition {} matches the hosted message's target state, set relay message {} to be expired.", partitionName, message.getId());
                return;
            }
        }
        setMessageRelayTime(message, currentTimeMillis);
        LOG.info("Current state {} for partition {} does not match hosted message's from state, set relay message {} to be expired.", state, partitionName, message.getId());
    }

    private void setMessageRelayTime(Message message, long j) {
        long relayTime = message.getRelayTime();
        if (relayTime <= message.getCreateTimeStamp() || relayTime >= j) {
            message.setRelayTime(j);
            LOG.info("Set relay message {} relay time at {}, to be expired at {}", message.getId(), Long.valueOf(j), Long.valueOf(j + message.getExpiryPeriod()));
        }
    }

    private void scheduleFuturePipeline(long j) {
        RebalanceUtil.scheduleOnDemandPipeline(this._clusterName, j - System.currentTimeMillis());
    }

    public Map<String, Message> getMessages(String str) {
        return this._messageMap.containsKey(str) ? this._messageMap.get(str) : Collections.emptyMap();
    }

    public Map<String, Message> getRelayMessages(String str) {
        return this._relayMessageMap.containsKey(str) ? this._relayMessageMap.get(str) : Collections.emptyMap();
    }

    public void cacheMessages(Collection<Message> collection) {
        for (Message message : collection) {
            String tgtName = message.getTgtName();
            if (!this._messageCache.containsKey(tgtName)) {
                this._messageCache.put(tgtName, Maps.newHashMap());
            }
            this._messageCache.get(tgtName).put(message.getId(), message);
            if (message.hasRelayMessages()) {
                Iterator<Message> it2 = message.getRelayMessages().values().iterator();
                while (it2.hasNext()) {
                    cacheRelayMessage(it2.next(), message);
                }
            }
        }
    }

    private void cacheRelayMessage(Message message, Message message2) {
        String tgtName = message.getTgtName();
        if (!this._relayMessageCache.containsKey(tgtName)) {
            this._relayMessageCache.put(tgtName, Maps.newHashMap());
        }
        if (!this._relayMessageCache.get(tgtName).containsKey(message.getId())) {
            LOG.info("Add relay message to relay cache " + message.getMsgId() + ", hosted message " + message2.getMsgId());
        }
        this._relayMessageCache.get(tgtName).put(message.getId(), message);
        this._relayHostMessageCache.put(message.getMsgId(), message2);
    }

    private void refreshStaleMessageCache() {
        LOG.info("Start to refresh stale message cache");
        HashMap hashMap = new HashMap();
        for (String str : this._staleMessageCache.keySet()) {
            for (String str2 : this._staleMessageCache.get(str).keySet()) {
                if (!this._messageCache.getOrDefault(str, Collections.emptyMap()).containsKey(str2)) {
                    ((Set) hashMap.computeIfAbsent(str, str3 -> {
                        return new HashSet();
                    })).add(str2);
                }
            }
        }
        hashMap.forEach((str4, set) -> {
            set.forEach(str4 -> {
                this._staleMessageCache.get(str4).remove(str4);
            });
            if (this._staleMessageCache.get(str4).size() == 0) {
                this._staleMessageCache.remove(str4);
            }
        });
    }

    public String toString() {
        return "InstanceMessagesCache{_messageMap=" + this._messageMap + ", _messageCache=" + this._messageCache + ", _clusterName='" + this._clusterName + "'}";
    }
}
