package org.apache.helix.controller.stages;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/controller/stages/MessageDispatchStage.class */
public abstract class MessageDispatchStage extends AbstractBaseStage {
    private static Logger logger = LoggerFactory.getLogger((Class<?>) MessageDispatchStage.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public void processEvent(ClusterEvent clusterEvent, MessageOutput messageOutput) throws Exception {
        ClusterStatusMonitor clusterStatusMonitor;
        this._eventId = clusterEvent.getEventId();
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        Map<String, Resource> map = (Map) clusterEvent.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
        BaseControllerDataProvider baseControllerDataProvider = (BaseControllerDataProvider) clusterEvent.getAttribute(AttributeName.ControllerDataProvider.name());
        Map<String, LiveInstance> liveInstances = baseControllerDataProvider.getLiveInstances();
        if (helixManager == null || map == null || messageOutput == null || baseControllerDataProvider == null || liveInstances == null) {
            throw new StageException("Missing attributes in event:" + clusterEvent + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
        }
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        ArrayList arrayList = new ArrayList();
        for (String str : map.keySet()) {
            Iterator<Partition> it2 = map.get(str).getPartitions().iterator();
            while (it2.hasNext()) {
                List<Message> messages = messageOutput.getMessages(str, it2.next());
                if (messages == null || messages.isEmpty()) {
                    messages = Collections.emptyList();
                }
                arrayList.addAll(messages);
            }
        }
        List<Message> batchMessage = batchMessage(helixDataAccessor.keyBuilder(), arrayList, map, liveInstances, helixManager.getProperties());
        if (clusterEvent.containsAttribute(AttributeName.EVENT_SESSION.name())) {
            Optional optional = (Optional) clusterEvent.getAttribute(AttributeName.EVENT_SESSION.name());
            if (!optional.isPresent() || !((String) optional.get()).equals(helixManager.getSessionId())) {
                throw new StageException(String.format("Event session doesn't match controller %s session! Expected session: %s, actual: %s", helixManager.getInstanceName(), optional.orElse("NOT_PRESENT"), helixManager.getSessionId()));
            }
        } else {
            logger.info("Event {} does not have event session attribute", clusterEvent.getEventId());
        }
        List<Message> sendMessages = sendMessages(helixDataAccessor, batchMessage);
        if (!(baseControllerDataProvider instanceof WorkflowControllerDataProvider) && (clusterStatusMonitor = (ClusterStatusMonitor) clusterEvent.getAttribute(AttributeName.clusterStatusMonitor.name())) != null) {
            clusterStatusMonitor.increaseMessageReceived(batchMessage);
        }
        long currentTimeMillis = System.currentTimeMillis();
        baseControllerDataProvider.cacheMessages(sendMessages);
        LogUtil.logDebug(logger, this._eventId, "Caching messages took " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
    }

    List<Message> batchMessage(PropertyKey.Builder builder, List<Message> list, Map<String, Resource> map, Map<String, LiveInstance> map2, HelixManagerProperties helixManagerProperties) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (Message message : list) {
            Resource resource = map.get(message.getResourceName());
            LiveInstance liveInstance = map2.get(message.getTgtName());
            String helixVersion = liveInstance != null ? liveInstance.getHelixVersion() : null;
            if (resource == null || !resource.getBatchMessageMode() || helixVersion == null || !helixManagerProperties.isFeatureSupported("batch_message", helixVersion)) {
                arrayList.add(message);
            } else {
                String str = builder.currentState(message.getTgtName(), message.getTgtSessionId(), message.getResourceName()).getPath() + "/" + message.getFromState() + "/" + message.getToState();
                if (!hashMap.containsKey(str)) {
                    Message message2 = new Message(message.getRecord());
                    message2.setBatchMessageMode(true);
                    arrayList.add(message2);
                    hashMap.put(str, message2);
                }
                ((Message) hashMap.get(str)).addPartitionName(message.getPartitionName());
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Message> sendMessages(HelixDataAccessor helixDataAccessor, List<Message> list) {
        ArrayList arrayList = new ArrayList();
        if (list == null || list.isEmpty()) {
            return arrayList;
        }
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList2 = new ArrayList();
        for (Message message : list) {
            LogUtil.logInfo(logger, this._eventId, "Sending Message " + message.getMsgId() + " to " + message.getTgtName() + " transit " + message.getResourceName() + "." + message.getPartitionName() + "|" + message.getPartitionNames() + " from:" + message.getFromState() + " to:" + message.getToState() + ", relayMessages: " + message.getRelayMessages().size());
            if (message.hasRelayMessages()) {
                for (Message message2 : message.getRelayMessages().values()) {
                    LogUtil.logInfo(logger, this._eventId, "Sending Relay Message " + message2.getMsgId() + " to " + message2.getTgtName() + " transit " + message2.getResourceName() + "." + message2.getPartitionName() + "|" + message2.getPartitionNames() + " from:" + message2.getFromState() + " to:" + message2.getToState() + ", relayFrom: " + message2.getRelaySrcHost() + ", attached to message: " + message.getMsgId());
                }
            }
            arrayList2.add(keyBuilder.message(message.getTgtName(), message.getId()));
        }
        boolean[] createChildren = helixDataAccessor.createChildren(arrayList2, new ArrayList(list));
        for (int i = 0; i < createChildren.length; i++) {
            if (createChildren[i]) {
                arrayList.add(list.get(i));
            } else {
                LogUtil.logError(logger, this._eventId, "Failed to send message: " + arrayList2.get(i));
            }
        }
        return arrayList;
    }
}
