package org.apache.helix.manager.zk;

import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/manager/zk/ParticipantManager.class */
public class ParticipantManager {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) ParticipantManager.class);
    private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
    final RealmAwareZkClient _zkclient;
    final HelixManager _manager;
    final PropertyKey.Builder _keyBuilder;
    final String _clusterName;
    final String _instanceName;
    final int _sessionTimeout;
    final ConfigAccessor _configAccessor;
    final InstanceType _instanceType;
    final HelixAdmin _helixAdmin;
    final ZKHelixDataAccessor _dataAccessor;
    final DefaultMessagingService _messagingService;
    final StateMachineEngine _stateMachineEngine;
    final LiveInstanceInfoProvider _liveInstanceInfoProvider;
    final List<PreConnectCallback> _preConnectCallbacks;
    final HelixManagerProperty _helixManagerProperty;
    private final String _sessionId;

    @Deprecated
    public ParticipantManager(HelixManager helixManager, RealmAwareZkClient realmAwareZkClient, int i, LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> list, String str) {
        this(helixManager, realmAwareZkClient, i, liveInstanceInfoProvider, list, str, null);
    }

    public ParticipantManager(HelixManager helixManager, RealmAwareZkClient realmAwareZkClient, int i, LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> list, String str, HelixManagerProperty helixManagerProperty) {
        this._zkclient = realmAwareZkClient;
        this._manager = helixManager;
        this._clusterName = helixManager.getClusterName();
        this._instanceName = helixManager.getInstanceName();
        this._keyBuilder = new PropertyKey.Builder(this._clusterName);
        this._sessionId = str;
        this._sessionTimeout = i;
        this._configAccessor = helixManager.getConfigAccessor();
        this._instanceType = helixManager.getInstanceType();
        this._helixAdmin = helixManager.getClusterManagmentTool();
        this._dataAccessor = (ZKHelixDataAccessor) helixManager.getHelixDataAccessor();
        this._messagingService = (DefaultMessagingService) helixManager.getMessagingService();
        this._stateMachineEngine = helixManager.getStateMachineEngine();
        this._liveInstanceInfoProvider = liveInstanceInfoProvider;
        this._preConnectCallbacks = list;
        this._helixManagerProperty = helixManagerProperty;
    }

    public void handleNewSession() throws Exception {
        String hexSessionId = ZKUtil.toHexSessionId(this._zkclient.getSessionId());
        if (!hexSessionId.equals(this._sessionId)) {
            throw new HelixException("Failed to handle new session for participant. There is a session mismatch: participant manager session = " + this._sessionId + ", zk client session = " + hexSessionId);
        }
        joinCluster();
        Iterator<PreConnectCallback> it2 = this._preConnectCallbacks.iterator();
        while (it2.hasNext()) {
            it2.next().onPreConnect();
        }
        createLiveInstance();
        if (shouldCarryOver()) {
            carryOverPreviousCurrentState(this._dataAccessor, this._instanceName, this._sessionId, this._manager.getStateMachineEngine(), true);
        }
        removePreviousTaskCurrentStates();
        setupMsgHandler();
    }

    private boolean shouldCarryOver() {
        if (this._liveInstanceInfoProvider == null || this._liveInstanceInfoProvider.getAdditionalLiveInstanceInfo() == null) {
            return true;
        }
        return !LiveInstance.LiveInstanceStatus.FROZEN.name().equals(this._liveInstanceInfoProvider.getAdditionalLiveInstanceInfo().getSimpleField(LiveInstance.LiveInstanceProperty.STATUS.name()));
    }

    private void joinCluster() {
        InstanceConfig composeInstanceConfig;
        boolean z = false;
        boolean z2 = false;
        try {
            z = Boolean.parseBoolean(this._configAccessor.get(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._manager.getClusterName()).build(), "allowParticipantAutoJoin"));
            LOG.info("instance: " + this._instanceName + " auto-joining " + this._clusterName + " is " + z);
        } catch (Exception e) {
            LOG.info("auto join is false for cluster" + this._clusterName);
        }
        try {
            z2 = Boolean.valueOf(this._helixManagerProperty.getHelixCloudProperty().getCloudEnabled()).booleanValue();
            LOG.info("instance: " + this._instanceName + " auto-registering " + this._clusterName + " is " + z2);
        } catch (Exception e2) {
            LOG.info("auto registration is false for cluster" + this._clusterName);
        }
        if (ZKUtil.isInstanceSetup(this._zkclient, this._clusterName, this._instanceName, this._instanceType)) {
            this._configAccessor.getInstanceConfig(this._clusterName, this._instanceName).validateTopologySettingInInstanceConfig(this._configAccessor.getClusterConfig(this._clusterName), this._instanceName);
            return;
        }
        if (!z) {
            throw new HelixException("Initial cluster structure is not set up for instance: " + this._instanceName + ", instanceType: " + this._instanceType);
        }
        if (z2) {
            LOG.info(this._instanceName + " is auto-registering cluster: " + this._clusterName);
            String str = getCloudInstanceInformation().get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + this._instanceName;
            composeInstanceConfig = HelixUtil.composeInstanceConfig(this._instanceName);
            composeInstanceConfig.setDomain(str);
        } else {
            LOG.info(this._instanceName + " is auto-joining cluster: " + this._clusterName);
            composeInstanceConfig = HelixUtil.composeInstanceConfig(this._instanceName);
        }
        composeInstanceConfig.validateTopologySettingInInstanceConfig(this._configAccessor.getClusterConfig(this._clusterName), this._instanceName);
        this._helixAdmin.addInstance(this._clusterName, composeInstanceConfig);
    }

    private CloudInstanceInformation getCloudInstanceInformation() {
        String cloudInfoProcessorName = this._helixManagerProperty.getHelixCloudProperty().getCloudInfoProcessorName();
        try {
            CloudInstanceInformationProcessor cloudInstanceInformationProcessor = (CloudInstanceInformationProcessor) Class.forName(CLOUD_PROCESSOR_PATH_PREFIX + this._helixManagerProperty.getHelixCloudProperty().getCloudProvider().toLowerCase() + "." + cloudInfoProcessorName).getConstructor(HelixCloudProperty.class).newInstance(this._helixManagerProperty.getHelixCloudProperty());
            return cloudInstanceInformationProcessor.parseCloudInstanceInformation(cloudInstanceInformationProcessor.fetchCloudInstanceInformation());
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new HelixException("Failed to create a new instance for the class: " + cloudInfoProcessorName, e);
        }
    }

    private void createLiveInstance() {
        boolean z;
        String path = this._keyBuilder.liveInstance(this._instanceName).getPath();
        LiveInstance liveInstance = new LiveInstance(this._instanceName);
        liveInstance.setSessionId(this._sessionId);
        liveInstance.setHelixVersion(this._manager.getVersion());
        liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
        liveInstance.setCurrentTaskThreadPoolSize(TaskUtil.getTargetThreadPoolSize(this._zkclient, this._clusterName, this._instanceName));
        if (this._liveInstanceInfoProvider != null) {
            LOG.info("invoke liveInstanceInfoProvider");
            ZNRecord additionalLiveInstanceInfo = this._liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
            if (additionalLiveInstanceInfo != null) {
                additionalLiveInstanceInfo.merge(liveInstance.getRecord());
                liveInstance = new LiveInstance(new ZNRecord(additionalLiveInstanceInfo, this._instanceName));
                LOG.info("instanceName: " + this._instanceName + ", mergedLiveInstance: " + liveInstance);
            }
        }
        while (true) {
            z = false;
            try {
                this._zkclient.createEphemeral(path, liveInstance.getRecord(), this._sessionId);
                LOG.info("LiveInstance created, path: {}, sessionId: {}", path, liveInstance.getEphemeralOwner());
            } catch (ZkNodeExistsException e) {
                LOG.warn("Found another instance with same instance name: {} in cluster: {}", this._instanceName, this._clusterName);
                if (((ZNRecord) this._zkclient.readData(path, new Stat(), true)) != null) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(this._sessionTimeout + 5000);
                    } catch (InterruptedException e2) {
                        LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.", (Throwable) e2);
                    }
                    z = true;
                    break;
                }
                z = true;
            } catch (ZkSessionMismatchedException e3) {
                throw new HelixException("Failed to create live instance, path: " + path + ". Caused by: " + e3.getMessage());
            }
            if (!z) {
                break;
            }
        }
        if (z) {
            try {
                this._zkclient.createEphemeral(path, liveInstance.getRecord(), this._sessionId);
                LOG.info("LiveInstance created, path: {}, sessionId: {}", path, liveInstance.getEphemeralOwner());
            } catch (ZkNodeExistsException e4) {
                throw new HelixException("Failed to create live instance because instance: " + this._instanceName + " already has a live-instance in cluster: " + this._clusterName + ". Path is: " + path);
            } catch (ZkSessionMismatchedException e5) {
                throw new HelixException("Failed to create live instance, path: " + path + ". Caused by: " + e5.getMessage());
            } catch (Exception e6) {
                throw new HelixException("Failed to create live instance. " + e6.getMessage());
            }
        }
        ParticipantHistory history = getHistory();
        history.reportOnline(this._sessionId, this._manager.getVersion());
        persistHistory(history, false);
    }

    public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor helixDataAccessor, String str, String str2, StateMachineEngine stateMachineEngine, boolean z) {
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        List<String> childNames = helixDataAccessor.getChildNames(keyBuilder.sessions(str));
        for (String str3 : childNames) {
            if (!str3.equals(str2)) {
                for (CurrentState currentState : helixDataAccessor.getChildValues(keyBuilder.currentStates(str, str3), false)) {
                    LOG.info("Carrying over old session: " + str3 + ", resource: " + currentState.getId() + " to current session: " + str2 + ", setToInitState: " + z);
                    String stateModelDefRef = currentState.getStateModelDefRef();
                    if (stateModelDefRef == null) {
                        LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " + currentState);
                    } else if (!stateModelDefRef.equals("Task")) {
                        String initialState = ((StateModelDefinition) helixDataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef))).getInitialState();
                        HashMap hashMap = new HashMap();
                        if (z) {
                            currentState.getPartitionStateMap().keySet().forEach(str4 -> {
                            });
                        } else {
                            StateModelFactory<? extends StateModel> stateModelFactory = stateMachineEngine.getStateModelFactory(stateModelDefRef, currentState.getStateModelFactoryName());
                            currentState.getPartitionStateMap().keySet().forEach(str5 -> {
                                StateModel stateModel = stateModelFactory.getStateModel(currentState.getResourceName(), str5);
                                if (stateModel != null) {
                                    hashMap.put(str5, stateModel.getCurrentState());
                                }
                            });
                        }
                        BaseDataAccessor<ZNRecord> baseDataAccessor = helixDataAccessor.getBaseDataAccessor();
                        String path = keyBuilder.currentState(str, str2, currentState.getResourceName()).getPath();
                        if (currentState.getBucketSize() > 0) {
                            ZNRecord zNRecord = new ZNRecord(currentState.getId());
                            zNRecord.setSimpleFields(currentState.getRecord().getSimpleFields());
                            if (baseDataAccessor.update(path, new CurStateCarryOverUpdater(str2, hashMap, new CurrentState(zNRecord)), AccessOption.PERSISTENT)) {
                                Map<String, ZNRecord> bucketize = new ZNRecordBucketizer(currentState.getBucketSize()).bucketize(currentState.getRecord());
                                ArrayList arrayList = new ArrayList();
                                ArrayList arrayList2 = new ArrayList();
                                for (String str6 : bucketize.keySet()) {
                                    arrayList.add(path + "/" + str6);
                                    arrayList2.add(new CurStateCarryOverUpdater(str2, hashMap, new CurrentState(bucketize.get(str6))));
                                }
                                baseDataAccessor.updateChildren(arrayList, arrayList2, AccessOption.PERSISTENT);
                            }
                        } else {
                            helixDataAccessor.getBaseDataAccessor().update(path, new CurStateCarryOverUpdater(str2, hashMap, currentState), AccessOption.PERSISTENT);
                        }
                    }
                }
            }
        }
        for (String str7 : childNames) {
            if (!str7.equals(str2)) {
                PropertyKey currentStates = keyBuilder.currentStates(str, str7);
                String path2 = currentStates.getPath();
                LOG.info("Removing current states from previous sessions. path: {}", path2);
                if (!helixDataAccessor.removeProperty(currentStates)) {
                    throw new ZkClientException("Failed to delete " + path2);
                }
            }
        }
    }

    private void removePreviousTaskCurrentStates() {
        for (String str : this._dataAccessor.getChildNames(this._keyBuilder.taskCurrentStateSessions(this._instanceName))) {
            if (!str.equals(this._sessionId)) {
                String path = this._keyBuilder.taskCurrentStates(this._instanceName, str).getPath();
                LOG.info("Removing task current states from previous sessions. path: " + path);
                this._zkclient.deleteRecursively(path);
            }
        }
    }

    private void setupMsgHandler() throws Exception {
        this._messagingService.registerMessageHandlerFactory(this._stateMachineEngine.getMessageTypes(), this._stateMachineEngine);
        this._manager.addMessageListener(this._messagingService.getExecutor(), this._instanceName);
        this._stateMachineEngine.registerStateModelFactory("SchedulerTaskQueue", new ScheduledTaskStateModelFactory(this._messagingService.getExecutor()));
        this._messagingService.onConnected();
    }

    private ParticipantHistory getHistory() {
        ParticipantHistory participantHistory = (ParticipantHistory) this._dataAccessor.getProperty(this._keyBuilder.participantHistory(this._instanceName));
        return participantHistory == null ? new ParticipantHistory(this._instanceName) : participantHistory;
    }

    private void persistHistory(ParticipantHistory participantHistory, boolean z) {
        PropertyKey participantHistory2 = this._keyBuilder.participantHistory(this._instanceName);
        if (z ? this._dataAccessor.updateProperty(participantHistory2, zNRecord -> {
            if (zNRecord == null) {
                return null;
            }
            return participantHistory.getRecord();
        }, participantHistory) : this._dataAccessor.setProperty(participantHistory2, participantHistory)) {
            return;
        }
        LOG.error("Failed to persist participant history to zk!");
    }

    public void reset() {
    }

    public void disconnect() {
        try {
            ParticipantHistory history = getHistory();
            history.reportOffline();
            persistHistory(history, true);
        } catch (Exception e) {
            LOG.error("Failed to report participant offline.", (Throwable) e);
        }
        reset();
    }
}
