package org.apache.helix.task;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.shaded.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/task/WorkflowDispatcher.class */
public class WorkflowDispatcher extends AbstractTaskDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) WorkflowDispatcher.class);
    private WorkflowControllerDataProvider _clusterDataCache;
    private JobDispatcher _jobDispatcher;

    public void updateCache(WorkflowControllerDataProvider workflowControllerDataProvider) {
        this._clusterDataCache = workflowControllerDataProvider;
        if (this._jobDispatcher == null) {
            this._jobDispatcher = new JobDispatcher();
        }
        this._jobDispatcher.init(this._manager);
        this._jobDispatcher.updateCache(workflowControllerDataProvider);
        this._jobDispatcher.setClusterStatusMonitor(this._clusterStatusMonitor);
    }

    public void updateWorkflowStatus(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        if (workflowConfig == null) {
            LOG.warn("Workflow configuration is NULL for {}", str);
            return;
        }
        TargetState targetState = workflowConfig.getTargetState();
        if (targetState == TargetState.DELETE) {
            LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", str);
            updateInflightJobs(str, workflowContext, currentStateOutput, bestPossibleStateOutput);
            cleanupWorkflow(str);
            return;
        }
        if (!workflowConfig.isJobQueue() && !TaskConstants.FINAL_STATES.contains(workflowContext.getWorkflowState())) {
            scheduleRebalanceForTimeout(str, workflowContext.getStartTime(), workflowConfig.getTimeout());
            if (!TaskState.TIMED_OUT.equals(workflowContext.getWorkflowState()) && isTimeout(workflowContext.getStartTime(), workflowConfig.getTimeout())) {
                workflowContext.setWorkflowState(TaskState.TIMED_OUT);
                this._clusterDataCache.updateWorkflowContext(str, workflowContext);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (workflowContext.getFinishTime() == -1 && isWorkflowFinished(workflowContext, workflowConfig, this._clusterDataCache.getJobConfigMap(), this._clusterDataCache)) {
            workflowContext.setFinishTime(currentTimeMillis);
            updateWorkflowMonitor(workflowContext, workflowConfig);
            this._clusterDataCache.updateWorkflowContext(str, workflowContext);
        }
        if (workflowContext.getFinishTime() != -1) {
            LOG.debug("Workflow {} is finished.", str);
            updateInflightJobs(str, workflowContext, currentStateOutput, bestPossibleStateOutput);
            long expiry = workflowConfig.getExpiry();
            if (workflowContext.getFinishTime() + expiry <= currentTimeMillis) {
                LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", str);
                cleanupWorkflow(str);
                return;
            } else {
                _rebalanceScheduler.scheduleRebalance(this._manager, str, workflowContext.getFinishTime() + expiry);
                return;
            }
        }
        if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
            HashSet hashSet = new HashSet(workflowContext.getJobStates().keySet());
            hashSet.removeAll(workflowConfig.getJobDag().getAllNodes());
            if (hashSet.size() > 0) {
                workflowContext.setLastJobPurgeTime(System.currentTimeMillis());
                workflowContext.removeJobStates(hashSet);
                workflowContext.removeJobStartTime(hashSet);
            }
        }
        updateInflightJobs(str, workflowContext, currentStateOutput, bestPossibleStateOutput);
        if (TaskConstants.FINAL_STATES.contains(workflowContext.getWorkflowState()) || !TargetState.STOP.equals(targetState)) {
            if (targetState.equals(TargetState.START) && workflowContext.getWorkflowState() == TaskState.STOPPED) {
                workflowContext.setWorkflowState(TaskState.IN_PROGRESS);
            }
            this._clusterDataCache.updateWorkflowContext(str, workflowContext);
            return;
        }
        if (!isWorkflowStopped(workflowContext, workflowConfig) || workflowContext.getWorkflowState() == TaskState.STOPPED) {
            return;
        }
        LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", str, workflowContext.getWorkflowState());
        workflowContext.setWorkflowState(TaskState.STOPPED);
        this._clusterDataCache.updateWorkflowContext(str, workflowContext);
    }

    private void updateInflightJobs(String str, WorkflowContext workflowContext, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        RuntimeJobDag runtimeJobDag = this._clusterDataCache.getTaskDataCache().getRuntimeJobDag(str);
        if (runtimeJobDag == null) {
            LOG.warn("Failed to find runtime job DAG for workflow {}, existing runtime jobs may not be processed correctly for it", str);
            return;
        }
        for (String str2 : runtimeJobDag.getInflightJobList()) {
            if (System.currentTimeMillis() >= workflowContext.getJobStartTime(str2)) {
                processJob(str2, currentStateOutput, bestPossibleStateOutput, workflowContext);
            }
        }
    }

    public void assignWorkflow(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        if (workflowConfig == null) {
            return;
        }
        if (!isWorkflowReadyForSchedule(workflowConfig)) {
            LOG.info("Workflow {} is not ready to schedule, schedule future rebalance at {}", str, Long.valueOf(workflowConfig.getStartTime().getTime()));
            _rebalanceScheduler.scheduleRebalance(this._manager, str, workflowConfig.getStartTime().getTime());
        } else {
            if (scheduleWorkflowIfReady(str, workflowConfig, workflowContext, this._clusterDataCache.getTaskDataCache())) {
                scheduleJobs(str, workflowConfig, workflowContext, this._clusterDataCache.getJobConfigMap(), this._clusterDataCache, currentStateOutput, bestPossibleStateOutput);
            } else {
                LOG.debug("Workflow {} is not ready to be scheduled.", str);
            }
            this._clusterDataCache.updateWorkflowContext(str, workflowContext);
        }
    }

    public WorkflowContext getOrInitializeWorkflowContext(String str, TaskDataCache taskDataCache) {
        WorkflowContext workflowContext = taskDataCache.getWorkflowContext(str);
        if (workflowContext == null) {
            if (taskDataCache.getWorkflowConfig(str) != null) {
                workflowContext = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
                workflowContext.setStartTime(System.currentTimeMillis());
                workflowContext.setName(str);
                LOG.debug("Workflow context is created for " + str);
            } else {
                LOG.error("Workflow context is not created for {}. Workflow config is missing!", str);
            }
        }
        return workflowContext;
    }

    private void scheduleJobs(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, Map<String, JobConfig> map, WorkflowControllerDataProvider workflowControllerDataProvider, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        ScheduleConfig scheduleConfig = workflowConfig.getScheduleConfig();
        if (scheduleConfig != null && scheduleConfig.isRecurring()) {
            LOG.debug("Jobs from recurring workflow {} are not schedule-able", str);
            return;
        }
        int inCompleteJobCount = TaskUtil.getInCompleteJobCount(workflowConfig, workflowContext);
        int i = 0;
        long j = Long.MAX_VALUE;
        JobDag runtimeJobDag = workflowControllerDataProvider.getTaskDataCache().getRuntimeJobDag(str);
        if (runtimeJobDag == null) {
            runtimeJobDag = workflowConfig.getJobDag();
        }
        String nextJob = runtimeJobDag.getNextJob();
        while (true) {
            String str2 = nextJob;
            if (str2 == null) {
                break;
            }
            TaskState jobState = workflowContext.getJobState(str2);
            if (jobState != null && !jobState.equals(TaskState.NOT_STARTED)) {
                LOG.debug("Job {} is already started or completed.", str2);
                processJob(str2, currentStateOutput, bestPossibleStateOutput, workflowContext);
                nextJob = runtimeJobDag.getNextJob();
            } else {
                if (workflowConfig.isJobQueue() && i >= workflowConfig.getParallelJobs()) {
                    LOG.debug("Workflow {} already have enough job in progress, scheduledJobs(s)={}, stop scheduling more jobs", str, Integer.valueOf(i));
                    break;
                }
                if (isJobReadyToSchedule(str2, workflowConfig, workflowContext, inCompleteJobCount, map, workflowControllerDataProvider, workflowControllerDataProvider.getAssignableInstanceManager())) {
                    long computeStartTimeForJob = computeStartTimeForJob(workflowContext, str2, map.get(str2));
                    if (System.currentTimeMillis() >= workflowContext.getJobStartTime(str2)) {
                        workflowContext.setJobState(str2, TaskState.NOT_STARTED);
                        processJob(str2, currentStateOutput, bestPossibleStateOutput, workflowContext);
                        i++;
                    } else {
                        j = Math.min(j, computeStartTimeForJob);
                    }
                }
                nextJob = runtimeJobDag.getNextJob();
            }
        }
        if (j < (_rebalanceScheduler.getRebalanceTime(str) == -1 ? Long.MAX_VALUE : _rebalanceScheduler.getRebalanceTime(str))) {
            _rebalanceScheduler.scheduleRebalance(this._manager, str, j);
        }
    }

    private void processJob(String str, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput, WorkflowContext workflowContext) {
        this._clusterDataCache.getTaskDataCache().dispatchJob(str);
        try {
            updateBestPossibleStateOutput(str, this._jobDispatcher.processJobStatusUpdateAndAssignment(str, currentStateOutput, workflowContext), bestPossibleStateOutput);
        } catch (Exception e) {
            LogUtil.logWarn(LOG, this._clusterDataCache.getClusterEventId(), String.format("Failed to compute job assignment for job %s", str), e);
        }
    }

    public void processJobForDrop(String str, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        JobConfig jobConfig = this._clusterDataCache.getJobConfig(str);
        if (jobConfig == null || this._clusterDataCache.getWorkflowConfig(jobConfig.getWorkflow()) == null || this._clusterDataCache.getWorkflowContext(jobConfig.getWorkflow()) == null) {
            updateBestPossibleStateOutput(str, buildEmptyAssignment(str, currentStateOutput), bestPossibleStateOutput);
        }
    }

    private boolean scheduleWorkflowIfReady(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, TaskDataCache taskDataCache) {
        WorkflowContext workflowContext2;
        if (workflowConfig == null || workflowConfig.getScheduleConfig() == null) {
            return true;
        }
        ScheduleConfig scheduleConfig = workflowConfig.getScheduleConfig();
        Date startTime = scheduleConfig.getStartTime();
        long time = new Date().getTime();
        long time2 = startTime.getTime() - time;
        if (time2 > 0) {
            _rebalanceScheduler.scheduleRebalance(this._manager, str, startTime.getTime());
            return false;
        }
        if (!scheduleConfig.isRecurring()) {
            long rebalanceTime = _rebalanceScheduler.getRebalanceTime(str);
            if (rebalanceTime <= 0 || time <= rebalanceTime) {
                return true;
            }
            _rebalanceScheduler.removeScheduledRebalance(str);
            return true;
        }
        if (!workflowConfig.getTargetState().equals(TargetState.START)) {
            LOG.debug("Skip scheduling since the workflow {} has not been started", str);
            return false;
        }
        String lastScheduledSingleWorkflow = workflowContext.getLastScheduledSingleWorkflow();
        if (lastScheduledSingleWorkflow != null && (workflowContext2 = taskDataCache.getWorkflowContext(lastScheduledSingleWorkflow)) != null && workflowContext2.getFinishTime() == -1) {
            LOG.info("Skip scheduling workflow {} since last schedule {} has not completed yet.", str, lastScheduledSingleWorkflow);
            return false;
        }
        long millis = scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval().longValue());
        long time3 = (millis * ((-time2) / millis)) + startTime.getTime();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone(HftpFileSystem.HFTP_TIMEZONE));
        String str2 = str + "_" + simpleDateFormat.format(new Date(time3));
        LOG.debug("Ready to start workflow {}", str2);
        if (!str2.equals(lastScheduledSingleWorkflow)) {
            Workflow cloneWorkflow = cloneWorkflow(this._manager, str, str2, new Date(time3));
            TaskDriver taskDriver = new TaskDriver(this._manager);
            if (cloneWorkflow != null) {
                try {
                    taskDriver.start(cloneWorkflow);
                } catch (Exception e) {
                    LOG.error("Failed to schedule cloned workflow {}. ", str2, e);
                    this._clusterStatusMonitor.updateWorkflowCounters(cloneWorkflow.getWorkflowConfig(), TaskState.FAILED);
                }
            }
            workflowContext.setLastScheduledSingleWorkflow(str2);
        }
        _rebalanceScheduler.scheduleRebalance(this._manager, str, time3 + millis);
        return false;
    }

    public static Workflow cloneWorkflow(HelixManager helixManager, String str, String str2, Date date) {
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        Map childValuesMap = helixDataAccessor.getChildValuesMap(helixDataAccessor.keyBuilder().resourceConfigs(), true);
        if (!childValuesMap.containsKey(str)) {
            LOG.error("No such workflow named {}", str);
            return null;
        }
        if (childValuesMap.containsKey(str2)) {
            LOG.error("Workflow with name {} already exists!", str2);
            return null;
        }
        WorkflowConfig.Builder fromMap = WorkflowConfig.Builder.fromMap(((HelixProperty) childValuesMap.get(str)).getRecord().getSimpleFields());
        if (date != null) {
            fromMap.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(date));
        }
        fromMap.setTerminable(true);
        WorkflowConfig build = fromMap.build();
        JobDag jobDag = build.getJobDag();
        Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
        Workflow.Builder builder = new Workflow.Builder(str2);
        builder.setWorkflowConfig(build);
        for (String str3 : jobDag.getAllNodes()) {
            if (childValuesMap.containsKey(str3)) {
                String denamespacedJobName = TaskUtil.getDenamespacedJobName(str, str3);
                HelixProperty helixProperty = (HelixProperty) childValuesMap.get(str3);
                JobConfig.Builder fromMap2 = JobConfig.Builder.fromMap(helixProperty.getRecord().getSimpleFields());
                fromMap2.setWorkflow(str2);
                Map<String, Map<String, String>> mapFields = helixProperty.getRecord().getMapFields();
                LinkedList newLinkedList = Lists.newLinkedList();
                Iterator<Map<String, String>> it2 = mapFields.values().iterator();
                while (it2.hasNext()) {
                    newLinkedList.add(TaskConfig.Builder.from(it2.next()));
                }
                fromMap2.addTaskConfigs(newLinkedList);
                builder.addJob(denamespacedJobName, fromMap2);
                Set<String> set = parentsToChildren.get(str3);
                if (set != null) {
                    Iterator<String> it3 = set.iterator();
                    while (it3.hasNext()) {
                        builder.addParentChildDependency(denamespacedJobName, TaskUtil.getDenamespacedJobName(str, it3.next()));
                    }
                }
            }
        }
        return builder.build();
    }

    private void cleanupWorkflow(String str) {
        LOG.info("Cleaning up workflow: " + str);
        WorkflowConfig workflowConfig = this._clusterDataCache.getWorkflowConfig(str);
        if (!workflowConfig.isTerminable() && workflowConfig.getTargetState() != TargetState.DELETE) {
            LOG.info("Did not clean up workflow {} because neither the workflow is non-terminable nor is set to DELETE.", str);
            return;
        }
        Set<String> allNodes = workflowConfig.getJobDag().getAllNodes();
        _rebalanceScheduler.removeScheduledRebalance(str);
        Iterator<String> it2 = allNodes.iterator();
        while (it2.hasNext()) {
            _rebalanceScheduler.removeScheduledRebalance(it2.next());
        }
        if (TaskUtil.removeWorkflow(this._manager.getHelixDataAccessor(), this._manager.getHelixPropertyStore(), str, allNodes)) {
            removeContexts(str, allNodes, this._clusterDataCache.getTaskDataCache());
        } else {
            LOG.warn("Failed to clean up workflow {}", str);
        }
    }

    private void removeContexts(String str, Set<String> set, TaskDataCache taskDataCache) {
        if (set != null) {
            Iterator<String> it2 = set.iterator();
            while (it2.hasNext()) {
                taskDataCache.removeContext(it2.next());
            }
        }
        taskDataCache.removeContext(str);
    }

    private long computeStartTimeForJob(WorkflowContext workflowContext, String str, JobConfig jobConfig) {
        long jobStartTime = workflowContext.getJobStartTime(str);
        if (jobStartTime < 0) {
            long currentTimeMillis = System.currentTimeMillis();
            if (jobConfig.getExecutionDelay() >= 0) {
                currentTimeMillis += jobConfig.getExecutionDelay();
            }
            jobStartTime = Math.max(currentTimeMillis, jobConfig.getExecutionStart());
            workflowContext.setJobStartTime(str, jobStartTime);
        }
        return jobStartTime;
    }
}
