package org.apache.helix.messaging.handling;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.manager.zk.ParticipantManager;
import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/messaging/handling/HelixTaskExecutor.class */
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) HelixTaskExecutor.class);
    private static AtomicLong thread_uid = new AtomicLong(0);
    public static final int DEFAULT_PARALLEL_TASKS = 40;
    protected final Map<String, MessageTaskInfo> _taskMap;
    private final Object _lock;
    private final StatusUpdateUtil _statusUpdateUtil;
    private final ParticipantStatusMonitor _monitor;
    public static final String MAX_THREADS = "maxThreads";
    private volatile boolean _isCleanState;
    private MessageQueueMonitor _messageQueueMonitor;
    private GenericHelixController _controller;
    private Long _lastSessionSyncTime;
    private String _freezeSessionId;
    private LiveInstance.LiveInstanceStatus _liveInstanceStatus;
    private static final int SESSION_SYNC_INTERVAL = 2000;
    private static final String SESSION_SYNC = "SESSION-SYNC";
    final ConcurrentHashMap<String, MsgHandlerFactoryRegistryItem> _hdlrFtyRegistry;
    final ConcurrentHashMap<String, ExecutorService> _executorMap;
    final ExecutorService _batchMessageExecutorService;
    final ConcurrentHashMap<String, String> _messageTaskMap;
    final Set<String> _knownMessageIds;
    final Set<String> _resourcesThreadpoolChecked;
    final Set<String> _transitionTypeThreadpoolChecked;
    final Set<String> _msgInfoBasedThreadpoolChecked;
    final Timer _timer;
    private boolean _isShuttingDown;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/messaging/handling/HelixTaskExecutor$MsgHandlerFactoryRegistryItem.class */
    public class MsgHandlerFactoryRegistryItem {
        private final MessageHandlerFactory _factory;
        private final int _threadPoolSize;
        private final int _resetTimeout;

        public MsgHandlerFactoryRegistryItem(MessageHandlerFactory messageHandlerFactory, int i, int i2) {
            if (messageHandlerFactory == null) {
                throw new NullPointerException("Message handler factory is null");
            }
            if (i <= 0) {
                throw new IllegalArgumentException("Illegal thread pool size: " + i);
            }
            if (i2 < 0) {
                throw new IllegalArgumentException("Illegal reset timeout: " + i2);
            }
            this._factory = messageHandlerFactory;
            this._threadPoolSize = i;
            this._resetTimeout = i2;
        }

        int threadPoolSize() {
            return this._threadPoolSize;
        }

        int getResetTimeout() {
            return this._resetTimeout;
        }

        MessageHandlerFactory factory() {
            return this._factory;
        }
    }

    public HelixTaskExecutor() {
        this(new ParticipantStatusMonitor(false, null), null);
    }

    public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor) {
        this(participantStatusMonitor, null);
    }

    public HelixTaskExecutor(ParticipantStatusMonitor participantStatusMonitor, MessageQueueMonitor messageQueueMonitor) {
        this._isCleanState = true;
        this._monitor = participantStatusMonitor;
        this._messageQueueMonitor = messageQueueMonitor;
        this._taskMap = new ConcurrentHashMap();
        this._hdlrFtyRegistry = new ConcurrentHashMap<>();
        this._executorMap = new ConcurrentHashMap<>();
        this._messageTaskMap = new ConcurrentHashMap<>();
        this._knownMessageIds = Collections.newSetFromMap(new ConcurrentHashMap());
        this._batchMessageExecutorService = Executors.newCachedThreadPool();
        this._monitor.createExecutorMonitor("BatchMessageExecutor", this._batchMessageExecutorService);
        this._resourcesThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap());
        this._transitionTypeThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap());
        this._msgInfoBasedThreadpoolChecked = Collections.newSetFromMap(new ConcurrentHashMap());
        this._lock = new Object();
        this._statusUpdateUtil = new StatusUpdateUtil();
        this._timer = new Timer("HelixTaskExecutor_Timer", true);
        this._isShuttingDown = false;
        this._liveInstanceStatus = LiveInstance.LiveInstanceStatus.NORMAL;
        startMonitorThread();
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void registerMessageHandlerFactory(MultiTypeMessageHandlerFactory multiTypeMessageHandlerFactory, int i, int i2) {
        Iterator<String> it2 = multiTypeMessageHandlerFactory.getMessageTypes().iterator();
        while (it2.hasNext()) {
            registerMessageHandlerFactory(it2.next(), multiTypeMessageHandlerFactory, i, i2);
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory) {
        registerMessageHandlerFactory(str, messageHandlerFactory, 40);
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory, int i) {
        registerMessageHandlerFactory(str, messageHandlerFactory, i, 200);
    }

    private void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory, int i, int i2) {
        if (messageHandlerFactory instanceof MultiTypeMessageHandlerFactory) {
            if (!((MultiTypeMessageHandlerFactory) messageHandlerFactory).getMessageTypes().contains(str)) {
                throw new HelixException("Message factory type mismatch. Type: " + str + ", factory: " + ((MultiTypeMessageHandlerFactory) messageHandlerFactory).getMessageTypes());
            }
        } else if (!messageHandlerFactory.getMessageType().equals(str)) {
            throw new HelixException("Message factory type mismatch. Type: " + str + ", factory: " + messageHandlerFactory.getMessageType());
        }
        this._isShuttingDown = false;
        MsgHandlerFactoryRegistryItem putIfAbsent = this._hdlrFtyRegistry.putIfAbsent(str, new MsgHandlerFactoryRegistryItem(messageHandlerFactory, i, i2));
        if (putIfAbsent != null) {
            LOG.info("Skip register message handler factory for type: {}, poolSize: {}, factory: {}, already existing factory: {}", str, Integer.valueOf(i), messageHandlerFactory, putIfAbsent.factory());
        } else {
            this._executorMap.computeIfAbsent(str, str2 -> {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i, runnable -> {
                    return new Thread(runnable, "HelixTaskExecutor-message_handle_thread_" + thread_uid.getAndIncrement());
                });
                this._monitor.createExecutorMonitor(str, newFixedThreadPool);
                return newFixedThreadPool;
            });
            LOG.info("Registered message handler factory for type: {}, poolSize: {}, factory: {}, pool: {}", str, Integer.valueOf(i), messageHandlerFactory, this._executorMap.get(str));
        }
    }

    public void setController(GenericHelixController genericHelixController) {
        this._controller = genericHelixController;
    }

    public ParticipantStatusMonitor getParticipantMonitor() {
        return this._monitor;
    }

    private void startMonitorThread() {
    }

    private void updateStateTransitionMessageThreadPool(Message message, HelixManager helixManager) {
        StateModelFactory.CustomizedExecutorService executorService;
        String messageIdentifier;
        if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
            String resourceName = message.getResourceName();
            String stateModelFactoryName = message.getStateModelFactoryName();
            String stateModelDef = message.getStateModelDef();
            if (stateModelFactoryName == null) {
                stateModelFactoryName = "DEFAULT";
            }
            StateModelFactory<? extends StateModel> stateModelFactory = helixManager.getStateMachineEngine().getStateModelFactory(stateModelDef, stateModelFactoryName);
            Message.MessageInfo messageInfo = new Message.MessageInfo(message);
            if (stateModelFactory != null && (executorService = stateModelFactory.getExecutorService(messageInfo)) != null && (messageIdentifier = messageInfo.getMessageIdentifier(executorService.getBase())) != null) {
                this._msgInfoBasedThreadpoolChecked.add(messageIdentifier);
                this._executorMap.put(messageIdentifier, executorService.getExecutorService());
                return;
            }
            String messageIdentifier2 = messageInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_STATE_TRANSITION_TYPE);
            if (messageIdentifier2 != null && stateModelFactory != null && !this._transitionTypeThreadpoolChecked.contains(messageIdentifier2)) {
                ExecutorService executorService2 = stateModelFactory.getExecutorService(resourceName, message.getFromState(), message.getToState());
                this._transitionTypeThreadpoolChecked.add(messageIdentifier2);
                if (executorService2 != null) {
                    this._executorMap.put(messageIdentifier2, executorService2);
                    LOG.info(String.format("Added client specified dedicate threadpool for resource %s from %s to %s", messageInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE), message.getFromState(), message.getToState()));
                    return;
                }
            }
            if (this._resourcesThreadpoolChecked.contains(resourceName)) {
                return;
            }
            int i = -1;
            ConfigAccessor configAccessor = helixManager.getConfigAccessor();
            if (configAccessor != null) {
                String str = configAccessor.get(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE).forCluster(helixManager.getClusterName()).forResource(resourceName).build(), MAX_THREADS);
                if (str != null) {
                    try {
                        i = Integer.parseInt(str);
                    } catch (Exception e) {
                        LOG.error("Failed to parse ThreadPoolSize from resourceConfig for resource" + resourceName, (Throwable) e);
                    }
                }
            }
            String messageIdentifier3 = messageInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.PER_RESOURCE);
            if (i > 0) {
                this._executorMap.put(messageIdentifier3, Executors.newFixedThreadPool(i, runnable -> {
                    return new Thread(runnable, "GerenricHelixController-message_handle_" + messageIdentifier3);
                }));
                LOG.info("Added dedicate threadpool for resource: " + resourceName + " with size: " + i);
            } else if (stateModelFactory != null) {
                ExecutorService executorService3 = stateModelFactory.getExecutorService(resourceName);
                if (executorService3 != null) {
                    this._executorMap.put(messageIdentifier3, executorService3);
                    LOG.info("Added client specified dedicate threadpool for resource: " + messageIdentifier3);
                }
            } else {
                LOG.error(String.format("Fail to get dedicate threadpool defined in stateModelFactory %s: using factoryName: %s for resource %s. No stateModelFactory was found!", stateModelDef, stateModelFactoryName, resourceName));
            }
            this._resourcesThreadpoolChecked.add(resourceName);
        }
    }

    ExecutorService findExecutorServiceForMsg(Message message) {
        ExecutorService executorService = this._executorMap.get(message.getMsgType());
        if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
            if (!message.getBatchMessageMode()) {
                Message.MessageInfo messageInfo = new Message.MessageInfo(message);
                int length = Message.MessageInfo.MessageIdentifierBase.values().length - 1;
                while (true) {
                    if (length >= 0) {
                        String messageIdentifier = messageInfo.getMessageIdentifier(Message.MessageInfo.MessageIdentifierBase.values()[length]);
                        if (messageIdentifier != null && this._executorMap.containsKey(messageIdentifier)) {
                            LOG.info(String.format("Find customized threadpool for %s", messageIdentifier));
                            executorService = this._executorMap.get(messageIdentifier);
                            break;
                        }
                        length--;
                    } else {
                        break;
                    }
                }
            } else {
                executorService = this._batchMessageExecutorService;
            }
        }
        return executorService;
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> list, long j, TimeUnit timeUnit) throws InterruptedException {
        if (list == null || list.size() == 0) {
            return null;
        }
        ExecutorService findExecutorServiceForMsg = findExecutorServiceForMsg(list.get(0).getMessage());
        for (int i = 1; i < list.size(); i++) {
            if (findExecutorServiceForMsg(list.get(i).getMessage()) != findExecutorServiceForMsg) {
                LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
                return null;
            }
        }
        return findExecutorServiceForMsg.invokeAll(list, j, timeUnit);
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean cancelTimeoutTask(MessageTask messageTask) {
        synchronized (this._lock) {
            String taskId = messageTask.getTaskId();
            if (!this._taskMap.containsKey(taskId)) {
                return false;
            }
            MessageTaskInfo messageTaskInfo = this._taskMap.get(taskId);
            removeMessageFromTaskAndFutureMap(messageTask.getMessage());
            if (messageTaskInfo._timerTask != null) {
                messageTaskInfo._timerTask.cancel();
            }
            return true;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean scheduleTask(MessageTask messageTask) {
        String taskId = messageTask.getTaskId();
        Message message = messageTask.getMessage();
        HelixManager manager = messageTask.getNotificationContext().getManager();
        try {
            updateStateTransitionMessageThreadPool(message, manager);
            LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(), message.getPartitionName(), message.getFromState(), message.getToState());
            this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager);
            synchronized (this._lock) {
                if (this._taskMap.containsKey(taskId)) {
                    this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "Message handling task already sheduled for " + taskId, manager);
                    return false;
                }
                ExecutorService findExecutorServiceForMsg = findExecutorServiceForMsg(message);
                if (findExecutorServiceForMsg == null) {
                    LOG.warn(String.format("Threadpool is null for type %s of message %s", message.getMsgType(), message.getMsgId()));
                    return false;
                }
                LOG.info("Submit task: " + taskId + " to pool: " + findExecutorServiceForMsg);
                Future submit = findExecutorServiceForMsg.submit(messageTask);
                this._messageTaskMap.putIfAbsent(getMessageTarget(message.getResourceName(), message.getPartitionName()), taskId);
                MessageTimeoutTask messageTimeoutTask = null;
                if (message.getExecutionTimeout() > 0) {
                    messageTimeoutTask = new MessageTimeoutTask(this, messageTask);
                    this._timer.schedule(messageTimeoutTask, message.getExecutionTimeout());
                    LOG.info("Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + messageTask.getTaskId());
                } else {
                    LOG.debug("Message does not have timeout. MsgId: " + messageTask.getTaskId());
                }
                this._taskMap.put(taskId, new MessageTaskInfo(messageTask, submit, messageTimeoutTask));
                LOG.info("Message: " + taskId + " handling task scheduled");
                return true;
            }
        } catch (Exception e) {
            LOG.error("Error while executing task. " + message, (Throwable) e);
            this._statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e, manager);
            return false;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean cancelTask(MessageTask messageTask) {
        Message message = messageTask.getMessage();
        NotificationContext notificationContext = messageTask.getNotificationContext();
        String taskId = messageTask.getTaskId();
        synchronized (this._lock) {
            if (this._taskMap.containsKey(taskId)) {
                MessageTaskInfo messageTaskInfo = this._taskMap.get(taskId);
                if (messageTaskInfo._timerTask != null) {
                    messageTaskInfo._timerTask.cancel();
                }
                Future<HelixTaskResult> future = messageTaskInfo.getFuture();
                removeMessageFromTaskAndFutureMap(message);
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId, notificationContext.getManager());
                if (future.cancel(true)) {
                    this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId, notificationContext.getManager());
                    this._taskMap.remove(taskId);
                    return true;
                }
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId, notificationContext.getManager());
            } else {
                this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId + ", future not found", notificationContext.getManager());
            }
            return false;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void finishTask(MessageTask messageTask) {
        Message message = messageTask.getMessage();
        String taskId = messageTask.getTaskId();
        LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message.getExecuteStartTimeStamp()));
        synchronized (this._lock) {
            if (this._taskMap.containsKey(taskId)) {
                MessageTaskInfo remove = this._taskMap.remove(taskId);
                removeMessageFromTaskAndFutureMap(message);
                if (remove._timerTask != null) {
                    remove._timerTask.cancel();
                }
            } else {
                LOG.warn("message " + taskId + " not found in task map");
            }
        }
    }

    private void updateMessageState(Collection<Message> collection, HelixDataAccessor helixDataAccessor, String str) {
        if (collection.isEmpty()) {
            return;
        }
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Message message : collection) {
            arrayList.add(message);
            arrayList2.add(message.getKey(keyBuilder, str).getPath());
            arrayList3.add(zNRecord -> {
                if (zNRecord != null) {
                    return message.getRecord();
                }
                LOG.warn("Message {} targets at {} has already been removed before it is set as READ on instance {}", message.getId(), message.getTgtName(), str);
                return null;
            });
        }
        boolean[] updateChildren = helixDataAccessor.updateChildren(arrayList2, arrayList3, AccessOption.PERSISTENT);
        boolean z = false;
        for (int i = 0; i < arrayList.size(); i++) {
            Message message2 = (Message) arrayList.get(i);
            if (message2.getMsgState().equals(Message.MessageState.NEW)) {
                z = true;
            } else {
                this._knownMessageIds.add(message2.getId());
                if (!updateChildren[i]) {
                    LOG.error("Failed to update the message {}.", message2.getMsgId());
                }
            }
        }
        if (z) {
            sendNopMessage(helixDataAccessor, str);
        }
    }

    private void shutdownAndAwaitTermination(ExecutorService executorService, MsgHandlerFactoryRegistryItem msgHandlerFactoryRegistryItem) {
        LOG.info("Shutting down pool: " + executorService);
        int resetTimeout = msgHandlerFactoryRegistryItem == null ? 200 : msgHandlerFactoryRegistryItem.getResetTimeout();
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(resetTimeout, TimeUnit.MILLISECONDS)) {
                LOG.info("Tasks that never commenced execution after {}: {}", Integer.valueOf(resetTimeout), executorService.shutdownNow());
                if (!executorService.awaitTermination(resetTimeout, TimeUnit.MILLISECONDS)) {
                    LOG.error("Pool did not fully terminate in {} ms. pool: {}", Integer.valueOf(resetTimeout), executorService);
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Interrupted when waiting for shutdown pool: " + executorService, (Throwable) e);
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    void unregisterMessageHandlerFactory(String str) {
        MsgHandlerFactoryRegistryItem remove = this._hdlrFtyRegistry.remove(str);
        ExecutorService remove2 = this._executorMap.remove(str);
        this._monitor.removeExecutorMonitor(str);
        LOG.info("Unregistering message handler factory for type: " + str + ", factory: " + remove.factory() + ", pool: " + remove2);
        if (remove2 != null) {
            shutdownAndAwaitTermination(remove2, remove);
        }
        if (remove != null) {
            remove.factory().reset();
        }
        LOG.info("Unregistered message handler factory for type: " + str + ", factory: " + remove.factory() + ", pool: " + remove2);
    }

    private void syncFactoryState() {
        LOG.info("Start to sync factory state");
        synchronized (this._hdlrFtyRegistry) {
            for (Map.Entry<String, MsgHandlerFactoryRegistryItem> entry : this._hdlrFtyRegistry.entrySet()) {
                MsgHandlerFactoryRegistryItem value = entry.getValue();
                if (value.factory() != null) {
                    try {
                        value.factory().sync();
                    } catch (Exception e) {
                        LOG.error("Failed to syncState the factory {} of message type {}.", value.factory(), entry.getKey(), e);
                    }
                }
            }
        }
    }

    private void shutdownExecutors() {
        synchronized (this._hdlrFtyRegistry) {
            Iterator it2 = this._hdlrFtyRegistry.keySet().iterator();
            while (it2.hasNext()) {
                String str = (String) it2.next();
                MsgHandlerFactoryRegistryItem msgHandlerFactoryRegistryItem = this._hdlrFtyRegistry.get(str);
                ExecutorService remove = this._executorMap.remove(str);
                this._monitor.removeExecutorMonitor(str);
                if (remove != null) {
                    LOG.info("Reset executor for msgType: " + str + ", pool: " + remove);
                    shutdownAndAwaitTermination(remove, msgHandlerFactoryRegistryItem);
                }
            }
        }
    }

    synchronized void reset() {
        if (this._isCleanState) {
            LOG.info("HelixTaskExecutor is in clean state, no need to reset again");
            return;
        }
        LOG.info("Reset HelixTaskExecutor");
        if (this._messageQueueMonitor != null) {
            this._messageQueueMonitor.reset();
        }
        shutdownExecutors();
        synchronized (this._hdlrFtyRegistry) {
            this._hdlrFtyRegistry.values().stream().map((v0) -> {
                return v0.factory();
            }).distinct().filter((v0) -> {
                return Objects.nonNull(v0);
            }).forEach(messageHandlerFactory -> {
                try {
                    messageHandlerFactory.reset();
                } catch (Exception e) {
                    LOG.error("Failed to reset the factory {}.", messageHandlerFactory.toString(), e);
                }
            });
        }
        StringBuilder sb = new StringBuilder();
        for (String str : this._taskMap.keySet()) {
            sb.append("Task: " + str + " fails to terminate. Message: " + this._taskMap.get(str)._task.getMessage() + "\n");
        }
        LOG.info(sb.toString());
        this._taskMap.clear();
        this._messageTaskMap.clear();
        this._knownMessageIds.clear();
        this._lastSessionSyncTime = null;
        this._isCleanState = true;
    }

    void init() {
        LOG.info("Init HelixTaskExecutor");
        if (this._messageQueueMonitor != null) {
            this._messageQueueMonitor.init();
        }
        this._isShuttingDown = false;
        Iterator it2 = this._hdlrFtyRegistry.keySet().iterator();
        while (it2.hasNext()) {
            String str = (String) it2.next();
            MsgHandlerFactoryRegistryItem msgHandlerFactoryRegistryItem = this._hdlrFtyRegistry.get(str);
            LOG.info("Setup the thread pool for type: {}, isShutdown: {}", str, Boolean.valueOf(this._executorMap.computeIfAbsent(str, str2 -> {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(msgHandlerFactoryRegistryItem.threadPoolSize(), runnable -> {
                    return new Thread(runnable, "HelixTaskExecutor-message_handle_" + str2);
                });
                this._monitor.createExecutorMonitor(str2, newFixedThreadPool);
                return newFixedThreadPool;
            }).isShutdown()));
        }
    }

    private void syncSessionToController(HelixManager helixManager) {
        if ((this._lastSessionSyncTime == null || System.currentTimeMillis() - this._lastSessionSyncTime.longValue() > 2000) && helixManager.getHelixDataAccessor().getProperty(new PropertyKey.Builder(helixManager.getClusterName()).controllerMessage(SESSION_SYNC)) == null) {
            LOG.info(String.format("Participant %s syncs session with controller", helixManager.getInstanceName()));
            Message message = new Message(Message.MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC);
            message.setSrcName(helixManager.getInstanceName());
            message.setTgtSessionId("*");
            message.setMsgState(Message.MessageState.NEW);
            message.setMsgId(SESSION_SYNC);
            Criteria criteria = new Criteria();
            criteria.setRecipientInstanceType(InstanceType.CONTROLLER);
            criteria.setSessionSpecific(false);
            helixManager.getMessagingService().send(criteria, message);
            this._lastSessionSyncTime = Long.valueOf(System.currentTimeMillis());
        }
    }

    private List<Message> readNewMessagesFromZK(HelixManager helixManager, String str, HelixConstants.ChangeType changeType) {
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        HashSet<String> hashSet = new HashSet();
        if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
            hashSet.addAll(helixDataAccessor.getChildNames(keyBuilder.messages(str)));
        } else {
            if (!changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
                LOG.warn("Unexpected ChangeType for Message Change CallbackHandler: " + changeType);
                return Collections.emptyList();
            }
            hashSet.addAll(helixDataAccessor.getChildNames(keyBuilder.controllerMessages()));
        }
        hashSet.removeAll(this._knownMessageIds);
        ArrayList arrayList = new ArrayList();
        for (String str2 : hashSet) {
            if (changeType.equals(HelixConstants.ChangeType.MESSAGE)) {
                arrayList.add(keyBuilder.message(str, str2));
            } else if (changeType.equals(HelixConstants.ChangeType.MESSAGES_CONTROLLER)) {
                arrayList.add(keyBuilder.controllerMessage(str2));
            }
        }
        List<Message> property = helixDataAccessor.getProperty(arrayList, false);
        Iterator<Message> it2 = property.iterator();
        while (it2.hasNext()) {
            if (it2.next() == null) {
                it2.remove();
            }
        }
        return property;
    }

    @Override // org.apache.helix.api.listeners.MessageListener
    @PreFetch(enabled = false)
    public void onMessage(String str, List<Message> list, NotificationContext notificationContext) {
        HelixManager manager = notificationContext.getManager();
        if (notificationContext.getType() == NotificationContext.Type.FINALIZE) {
            reset();
            return;
        }
        if (notificationContext.getType() == NotificationContext.Type.INIT) {
            init();
        }
        this._isCleanState = false;
        if (list == null || list.isEmpty()) {
            list = readNewMessagesFromZK(manager, str, notificationContext.getChangeType());
        }
        if (this._isShuttingDown) {
            StringBuilder sb = new StringBuilder();
            Iterator<Message> it2 = list.iterator();
            while (it2.hasNext()) {
                sb.append(it2.next().getMsgId() + ",");
            }
            LOG.info("Helix task executor is shutting down, ignore unprocessed messages : " + sb.toString());
            return;
        }
        if (this._messageQueueMonitor != null) {
            this._messageQueueMonitor.setMessageQueueBacklog(list.size());
        }
        if (list.isEmpty()) {
            LOG.info("No Messages to process");
            return;
        }
        Collections.sort(list, Message.CREATE_TIME_COMPARATOR);
        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        HashMap hashMap3 = new HashMap();
        String sessionId = manager.getSessionId();
        List<String> childNames = helixDataAccessor.getChildNames(keyBuilder.currentStates(str, sessionId));
        List<String> childNames2 = helixDataAccessor.getChildNames(keyBuilder.taskCurrentStates(str, sessionId));
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        HashSet hashSet = new HashSet();
        for (Message message : list) {
            if (!checkAndProcessNoOpMessage(message, str, notificationContext, manager, sessionId, hashMap)) {
                NotificationContext m10371clone = notificationContext.m10371clone();
                MessageHandler messageHandler = null;
                try {
                    messageHandler = createMessageHandler(message, m10371clone);
                } catch (Exception e) {
                    int retryCount = message.getRetryCount();
                    LOG.error("Exception happens when creating Message Handler for message {}. Current remaining retry count is {}.", message.getMsgId(), Integer.valueOf(retryCount));
                    message.setRetryCount(retryCount - 1);
                    message.setExecuteSessionId(sessionId);
                    if (message.getRetryCount() <= 0) {
                        updateUnprocessableMessage(message, null, String.format("No available message Handler found! Stop processing message %s since it has zero or negative remaining retry count %d!", message.getMsgId(), Integer.valueOf(message.getRetryCount())), manager);
                    }
                    hashMap3.put(message.getId(), message);
                }
                if (messageHandler == null) {
                    LOG.warn("There is no existing handler for message {}. Skip processing it for now. Will retry on the next callback.", message.getMsgId());
                } else {
                    if (!message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name()) && !message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) {
                        arrayList.add(messageHandler);
                        arrayList2.add(m10371clone);
                    } else if (validateAndProcessStateTransitionMessage(message, manager, hashMap, messageHandler)) {
                        String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
                        hashMap.put(messageTarget, messageHandler);
                        hashMap2.put(messageTarget, m10371clone);
                    } else {
                        removeMessageFromZK(helixDataAccessor, message, str);
                    }
                    Message markReadMessage = markReadMessage(message, m10371clone, manager);
                    hashMap3.put(markReadMessage.getId(), markReadMessage);
                    if (!message.isControlerMsg() && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
                        String resourceName = message.getResourceName();
                        if (!childNames.contains(resourceName) && !childNames2.contains(resourceName) && !hashSet.contains(resourceName)) {
                            hashSet.add(resourceName);
                            PropertyKey currentState = keyBuilder.currentState(str, sessionId, resourceName);
                            if (TaskConstants.STATE_MODEL_NAME.equals(message.getStateModelDef()) && !Boolean.getBoolean(SystemPropertyKeys.TASK_CURRENT_STATE_PATH_DISABLED)) {
                                currentState = keyBuilder.taskCurrentState(str, sessionId, resourceName);
                            }
                            arrayList3.add(currentState);
                            CurrentState currentState2 = new CurrentState(resourceName);
                            currentState2.setBucketSize(message.getBucketSize());
                            currentState2.setStateModelDefRef(message.getStateModelDef());
                            currentState2.setSessionId(sessionId);
                            currentState2.setBatchMessageMode(message.getBatchMessageMode());
                            String stateModelFactoryName = message.getStateModelFactoryName();
                            if (stateModelFactoryName != null) {
                                currentState2.setStateModelFactoryName(stateModelFactoryName);
                            } else {
                                currentState2.setStateModelFactoryName("DEFAULT");
                            }
                            arrayList4.add(currentState2);
                        }
                    }
                }
            }
        }
        if (arrayList3.size() > 0) {
            try {
                helixDataAccessor.createChildren(arrayList3, arrayList4);
            } catch (Exception e2) {
                LOG.error("fail to create cur-state znodes for messages: " + hashMap3, (Throwable) e2);
            }
        }
        updateMessageState(hashMap3.values(), helixDataAccessor, str);
        for (Map.Entry<String, MessageHandler> entry : hashMap.entrySet()) {
            MessageHandler value = entry.getValue();
            if (!scheduleTaskForMessage(str, helixDataAccessor, value, (NotificationContext) hashMap2.get(entry.getKey())) && !this._isShuttingDown) {
                try {
                    value.onError(new HelixException(String.format("Failed to schedule the task for executing message handler for %s.", value._message.getMsgId())), MessageHandler.ErrorCode.ERROR, MessageHandler.ErrorType.FRAMEWORK);
                } catch (Exception e3) {
                    LOG.error("Failed to trigger onError method of the message handler for {}", value._message.getMsgId(), e3);
                }
            }
        }
        for (int i = 0; i < arrayList.size(); i++) {
            scheduleTaskForMessage(str, helixDataAccessor, (MessageHandler) arrayList.get(i), (NotificationContext) arrayList2.get(i));
        }
    }

    private boolean checkAndProcessNoOpMessage(Message message, String str, NotificationContext notificationContext, HelixManager helixManager, String str2, Map<String, MessageHandler> map) {
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        try {
            if (message.getMsgType().equalsIgnoreCase(Message.MessageType.NO_OP.toString())) {
                LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
                reportAndRemoveMessage(message, helixDataAccessor, str, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
                return true;
            }
            String tgtSessionId = message.getTgtSessionId();
            if (!str2.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
                String str3 = "SessionId does NOT match. expected sessionId: " + str2 + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
                LOG.warn(str3);
                reportAndRemoveMessage(message, helixDataAccessor, str, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
                this._statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, str3, helixManager);
                if ((helixManager.getInstanceType() != InstanceType.PARTICIPANT && helixManager.getInstanceType() != InstanceType.CONTROLLER_PARTICIPANT) || message.getCreateTimeStamp() <= helixManager.getSessionStartTime().longValue()) {
                    return true;
                }
                syncSessionToController(helixManager);
                return true;
            }
            if ((helixManager.getInstanceType() == InstanceType.CONTROLLER || helixManager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) && Message.MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType())) {
                LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s", message.getMsgSrc()));
                this._controller.onLiveInstanceChange(helixManager.getHelixDataAccessor().getChildValues(new PropertyKey.Builder(helixManager.getClusterName()).liveInstances(), true), notificationContext);
                reportAndRemoveMessage(message, helixDataAccessor, str, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
                return true;
            }
            if (Message.MessageState.NEW != message.getMsgState()) {
                if (!LOG.isTraceEnabled()) {
                    return true;
                }
                LOG.trace("Message already read. msgId: " + message.getMsgId());
                return true;
            }
            if (message.isExpired()) {
                LOG.info("Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + " relayed from: " + message.getRelaySrcHost());
                reportAndRemoveMessage(message, helixDataAccessor, str, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
                return true;
            }
            if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name()) && cancelNotStartedStateTransition(message, map, helixDataAccessor, str)) {
                return true;
            }
            if (!Message.MessageType.PARTICIPANT_STATUS_CHANGE.name().equals(message.getMsgType())) {
                this._monitor.reportReceivedMessage(message);
                return false;
            }
            changeParticipantStatus(str, LiveInstance.LiveInstanceStatus.valueOf(message.getToState()), helixManager);
            reportAndRemoveMessage(message, helixDataAccessor, str, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
            return true;
        } catch (Exception e) {
            LOG.error("Failed to process the message {}. Deleting the message from ZK. Exception: {}", message, e);
            removeMessageFromTaskAndFutureMap(message);
            removeMessageFromZK(helixDataAccessor, message, str);
            return true;
        }
    }

    private boolean validateAndProcessStateTransitionMessage(Message message, HelixManager helixManager, Map<String, MessageHandler> map, MessageHandler messageHandler) {
        String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
        try {
            if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name()) && isStateTransitionInProgress(messageTarget)) {
                Message message2 = this._taskMap.get(this._messageTaskMap.get(messageTarget)).getTask().getMessage();
                updateUnprocessableMessage(message, null, String.format("Another state transition for %s:%s is in progress with msg: %s, p2p: %s, read: %d, current:%d. Discarding %s->%s message", message.getResourceName(), message.getPartitionName(), message2.getMsgId(), Boolean.valueOf(message2.isRelayMessage()), Long.valueOf(message2.getReadTimeStamp()), Long.valueOf(System.currentTimeMillis()), message.getFromState(), message.getToState()), helixManager);
                return false;
            }
            if (messageHandler instanceof HelixStateTransitionHandler) {
                HelixStateTransitionHandler.StaleMessageValidateResult staleMessageValidator = ((HelixStateTransitionHandler) messageHandler).staleMessageValidator();
                if (!staleMessageValidator.isValid) {
                    updateUnprocessableMessage(message, null, staleMessageValidator.exception.getMessage(), helixManager);
                    return false;
                }
            }
            if (!map.containsKey(messageTarget)) {
                return true;
            }
            Message message3 = map.get(messageTarget)._message;
            updateUnprocessableMessage(message, null, String.format("Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s", message.getMsgId(), message3.getFromState(), message3.getToState(), message.getFromState(), message.getToState()), helixManager);
            return false;
        } catch (Exception e) {
            updateUnprocessableMessage(message, e, "State transition validation failed with Exception.", helixManager);
            return false;
        }
    }

    private boolean scheduleTaskForMessage(String str, HelixDataAccessor helixDataAccessor, MessageHandler messageHandler, NotificationContext notificationContext) {
        Message message = messageHandler._message;
        if (scheduleTask(new HelixTask(message, notificationContext, messageHandler, this))) {
            return true;
        }
        removeMessageFromTaskAndFutureMap(message);
        removeMessageFromZK(helixDataAccessor, message, str);
        return false;
    }

    private boolean isStateTransitionInProgress(String str) {
        synchronized (this._lock) {
            if (this._messageTaskMap.containsKey(str)) {
                return !this._taskMap.get(this._messageTaskMap.get(str)).getFuture().isDone();
            }
            return false;
        }
    }

    private boolean cancelNotStartedStateTransition(Message message, Map<String, MessageHandler> map, HelixDataAccessor helixDataAccessor, String str) {
        Message message2;
        ParticipantMessageMonitor.ProcessedMessageState processedMessageState;
        String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
        if (map.containsKey(messageTarget)) {
            message2 = map.get(messageTarget).getMessage();
            if (isCancelingSameStateTransition(message2, message)) {
                map.remove(messageTarget);
                processedMessageState = ParticipantMessageMonitor.ProcessedMessageState.COMPLETED;
            } else {
                processedMessageState = ParticipantMessageMonitor.ProcessedMessageState.DISCARDED;
            }
        } else {
            if (!this._messageTaskMap.containsKey(messageTarget)) {
                return false;
            }
            String str2 = this._messageTaskMap.get(messageTarget);
            HelixTask helixTask = (HelixTask) this._taskMap.get(str2).getTask();
            Future<HelixTaskResult> future = this._taskMap.get(str2).getFuture();
            message2 = helixTask.getMessage();
            if (!isCancelingSameStateTransition(helixTask.getMessage(), message)) {
                processedMessageState = ParticipantMessageMonitor.ProcessedMessageState.DISCARDED;
            } else {
                if (!helixTask.cancel()) {
                    return false;
                }
                future.cancel(false);
                this._messageTaskMap.remove(messageTarget);
                this._taskMap.remove(str2);
                processedMessageState = ParticipantMessageMonitor.ProcessedMessageState.COMPLETED;
            }
        }
        removeMessageFromZK(helixDataAccessor, message2, str);
        this._monitor.reportProcessedMessage(message2, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
        reportAndRemoveMessage(message, helixDataAccessor, str, processedMessageState);
        return true;
    }

    private void reportAndRemoveMessage(Message message, HelixDataAccessor helixDataAccessor, String str, ParticipantMessageMonitor.ProcessedMessageState processedMessageState) {
        this._monitor.reportReceivedMessage(message);
        this._monitor.reportProcessedMessage(message, processedMessageState);
        removeMessageFromZK(helixDataAccessor, message, str);
    }

    private Message markReadMessage(Message message, NotificationContext notificationContext, HelixManager helixManager) {
        message.setMsgState(Message.MessageState.READ);
        message.setReadTimeStamp(new Date().getTime());
        message.setExecuteSessionId(notificationContext.getManager().getSessionId());
        this._statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", helixManager);
        return message;
    }

    private void updateUnprocessableMessage(Message message, Exception exc, String str, HelixManager helixManager) {
        String str2 = "Message " + message.getMsgId() + " cannot be processed: " + message.getRecord();
        if (exc != null) {
            LOG.error(str2, (Throwable) exc);
            this._statusUpdateUtil.logError(message, HelixStateMachineEngine.class, exc, str2, helixManager);
        } else {
            LOG.error(str2 + str);
            this._statusUpdateUtil.logError(message, HelixStateMachineEngine.class, str, helixManager);
        }
        message.setMsgState(Message.MessageState.UNPROCESSABLE);
        this._monitor.reportProcessedMessage(message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
    }

    public MessageHandler createMessageHandler(Message message, NotificationContext notificationContext) {
        String msgType = message.getMsgType();
        MsgHandlerFactoryRegistryItem msgHandlerFactoryRegistryItem = this._hdlrFtyRegistry.get(msgType);
        if (msgHandlerFactoryRegistryItem == null) {
            LOG.warn("Fail to find message handler factory for type: " + msgType + " msgId: " + message.getMsgId());
            return null;
        }
        MessageHandlerFactory factory = msgHandlerFactoryRegistryItem.factory();
        notificationContext.add(NotificationContext.MapKey.TASK_EXECUTOR.toString(), this);
        return factory.createHandler(message, notificationContext);
    }

    private void removeMessageFromTaskAndFutureMap(Message message) {
        this._knownMessageIds.remove(message.getId());
        String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName());
        if (this._messageTaskMap.containsKey(messageTarget)) {
            this._messageTaskMap.remove(messageTarget);
        }
    }

    private boolean isCancelingSameStateTransition(Message message, Message message2) {
        return message.getFromState().equalsIgnoreCase(message2.getFromState()) && message.getToState().equalsIgnoreCase(message2.getToState());
    }

    String getMessageTarget(String str, String str2) {
        return String.format("%s_%s", str, str2);
    }

    private void changeParticipantStatus(String str, LiveInstance.LiveInstanceStatus liveInstanceStatus, HelixManager helixManager) {
        if (liveInstanceStatus == null) {
            LOG.warn("To status is null! Skip participant status change.");
            return;
        }
        LOG.info("Changing participant {} status to {} from {}", str, liveInstanceStatus, this._liveInstanceStatus);
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        String sessionId = helixManager.getSessionId();
        String path = helixDataAccessor.keyBuilder().liveInstance(str).getPath();
        boolean z = false;
        switch (liveInstanceStatus) {
            case FROZEN:
                this._freezeSessionId = sessionId;
                this._liveInstanceStatus = liveInstanceStatus;
                z = helixDataAccessor.getBaseDataAccessor().update(path, zNRecord -> {
                    zNRecord.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), liveInstanceStatus);
                    return zNRecord;
                }, AccessOption.EPHEMERAL);
                break;
            case NORMAL:
                if (this._freezeSessionId != null && !this._freezeSessionId.equals(sessionId)) {
                    syncFactoryState();
                    ParticipantManager.carryOverPreviousCurrentState(helixDataAccessor, str, sessionId, helixManager.getStateMachineEngine(), false);
                }
                this._freezeSessionId = null;
                this._liveInstanceStatus = liveInstanceStatus;
                z = helixDataAccessor.getBaseDataAccessor().update(path, zNRecord2 -> {
                    zNRecord2.getSimpleFields().remove(LiveInstance.LiveInstanceProperty.STATUS.name());
                    return zNRecord2;
                }, AccessOption.EPHEMERAL);
                break;
            default:
                LOG.warn("To status {} is not supported", liveInstanceStatus);
                break;
        }
        LOG.info("Changed participant {} status to {}. FreezeSessionId={}, update success={}", str, this._liveInstanceStatus, this._freezeSessionId, Boolean.valueOf(z));
    }

    public LiveInstance.LiveInstanceStatus getLiveInstanceStatus() {
        return this._liveInstanceStatus;
    }

    private void removeMessageFromZK(HelixDataAccessor helixDataAccessor, Message message, String str) {
        if (HelixUtil.removeMessageFromZK(helixDataAccessor, message, str)) {
            LOG.info("Successfully removed message {} from ZK.", message.getMsgId());
        } else {
            LOG.warn("Failed to remove message {} from ZK.", message.getMsgId());
        }
    }

    private void sendNopMessage(HelixDataAccessor helixDataAccessor, String str) {
        try {
            Message message = new Message(Message.MessageType.NO_OP, UUID.randomUUID().toString());
            message.setSrcName(str);
            message.setTgtName(str);
            helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(message.getTgtName(), message.getId()), message);
            LOG.info("Send NO_OP message to {}, msgId: {}.", message.getTgtName(), message.getId());
        } catch (Exception e) {
            LOG.error("Failed to send NO_OP message to {}.", str, e);
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void shutdown() {
        LOG.info("Shutting down HelixTaskExecutor");
        this._isShuttingDown = true;
        this._timer.cancel();
        shutdownExecutors();
        reset();
        this._monitor.shutDown();
        LOG.info("Shutdown HelixTaskExecutor finished");
    }
}
