package org.apache.pinot.controller.helix.core.minion;

import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.TaskGeneratorMostRecentRunInfo;
import org.apache.pinot.common.minion.TaskManagerStatusCache;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/minion/PinotTaskManager.class */
public class PinotTaskManager extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PinotTaskManager.class);
    public static final String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
    public static final String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
    public static final String SCHEDULE_KEY = "schedule";
    private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
    private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
    private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context";
    private final PinotHelixTaskResourceManager _helixTaskResourceManager;
    private final ClusterInfoAccessor _clusterInfoAccessor;
    private final TaskGeneratorRegistry _taskGeneratorRegistry;
    private final Scheduler _scheduler;
    private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap;
    private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap;
    private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap;
    private final Map<TaskState, Integer> _taskStateToCountMap;
    private final ZkTableConfigChangeListener _zkTableConfigChangeListener;
    private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;

    /* loaded from: input_file:org/apache/pinot/controller/helix/core/minion/PinotTaskManager$ZkTableConfigChangeListener.class */
    private class ZkTableConfigChangeListener implements IZkChildListener {
        private ZkTableConfigChangeListener() {
        }

        @Override // org.apache.helix.zookeeper.zkclient.IZkChildListener
        public synchronized void handleChildChange(String str, List<String> list) {
            PinotTaskManager.this.checkTableConfigChanges(list);
        }
    }

    public PinotTaskManager(PinotHelixTaskResourceManager pinotHelixTaskResourceManager, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache) {
        super(PINOT_TASK_MANAGER_KEY, controllerConf.getTaskManagerFrequencyInSeconds(), controllerConf.getPinotTaskManagerInitialDelaySeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._tableTaskTypeToCronExpressionMap = new ConcurrentHashMap();
        this._tableTaskSchedulerUpdaterMap = new ConcurrentHashMap();
        this._taskTypeMetricsUpdaterMap = new ConcurrentHashMap();
        this._taskStateToCountMap = new ConcurrentHashMap();
        this._zkTableConfigChangeListener = new ZkTableConfigChangeListener();
        this._helixTaskResourceManager = pinotHelixTaskResourceManager;
        this._taskManagerStatusCache = taskManagerStatusCache;
        this._clusterInfoAccessor = new ClusterInfoAccessor(pinotHelixResourceManager, pinotHelixTaskResourceManager, controllerConf, controllerMetrics, leadControllerManager);
        this._taskGeneratorRegistry = new TaskGeneratorRegistry(this._clusterInfoAccessor);
        if (!controllerConf.isPinotTaskManagerSchedulerEnabled()) {
            this._scheduler = null;
            return;
        }
        try {
            this._scheduler = new StdSchedulerFactory().getScheduler();
            this._scheduler.start();
            synchronized (this._zkTableConfigChangeListener) {
                LOGGER.info("Check and subscribe to tables change under PropertyStore path: {}", TABLE_CONFIG_PARENT_PATH);
                this._pinotHelixResourceManager.getPropertyStore().subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, this._zkTableConfigChangeListener);
                List<String> childNames = this._pinotHelixResourceManager.getPropertyStore().getChildNames(TABLE_CONFIG_PARENT_PATH, AccessOption.PERSISTENT);
                if (CollectionUtils.isNotEmpty(childNames)) {
                    checkTableConfigChanges(childNames);
                }
            }
        } catch (SchedulerException e) {
            throw new RuntimeException("Caught exception while setting up the scheduler", e);
        }
    }

    public Map<String, String> createTask(String str, String str2, @Nullable String str3, Map<String, String> map) throws Exception {
        if (str3 == null) {
            str3 = str2 + "_" + UUID.randomUUID();
            LOGGER.info("Task name is missing, auto-generate one: {}", str3);
        }
        String orDefault = map.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
        this._helixTaskResourceManager.ensureTaskQueueExists(str);
        addTaskTypeMetricsUpdaterIfNeeded(str);
        String parentTaskName = this._helixTaskResourceManager.getParentTaskName(str, str3);
        TaskState taskState = this._helixTaskResourceManager.getTaskState(parentTaskName);
        if (taskState != null) {
            throw new TaskAlreadyExistsException("Task [" + str3 + "] of type [" + str + "] is already created. Current state is " + taskState);
        }
        ArrayList<String> arrayList = new ArrayList();
        if (TableNameBuilder.getTableTypeFromTableName(str2) == null) {
            String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(str2);
            if (this._pinotHelixResourceManager.hasOfflineTable(tableNameWithType)) {
                arrayList.add(tableNameWithType);
            }
            String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(str2);
            if (this._pinotHelixResourceManager.hasRealtimeTable(tableNameWithType2)) {
                arrayList.add(tableNameWithType2);
            }
        } else if (this._pinotHelixResourceManager.hasTable(str2)) {
            arrayList.add(str2);
        }
        if (arrayList.isEmpty()) {
            throw new TableNotFoundException("'tableName' " + str2 + " is not found");
        }
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(str);
        if (taskGenerator == null) {
            throw new UnknownTaskTypeException("Task type: " + str + " is not registered, cannot enable it for table: " + str2);
        }
        HashMap hashMap = new HashMap();
        for (String str4 : arrayList) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str4);
            LOGGER.info("Trying to create tasks of type: {}, table: {}", str, str4);
            List<PinotTaskConfig> generateTasks = taskGenerator.generateTasks(tableConfig, map);
            if (generateTasks.isEmpty()) {
                LOGGER.warn("No ad-hoc task generated for task type: {}", str);
            } else {
                LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", str, generateTasks);
                this._controllerMetrics.addMeteredTableValue(str, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1L);
                hashMap.put(str4, this._helixTaskResourceManager.submitTask(parentTaskName, generateTasks, orDefault, taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance()));
            }
        }
        if (hashMap.isEmpty()) {
            throw new NoTaskScheduledException("No task scheduled for 'tableName': " + str2);
        }
        return hashMap;
    }

    private void checkTableConfigChanges(List<String> list) {
        LOGGER.info("Checking task config changes in table configs");
        if (this._tableTaskSchedulerUpdaterMap.isEmpty()) {
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                subscribeTableConfigChanges(it2.next());
            }
            return;
        }
        HashSet hashSet = new HashSet(this._tableTaskSchedulerUpdaterMap.keySet());
        HashSet hashSet2 = new HashSet();
        for (String str : list) {
            if (!hashSet.remove(str)) {
                hashSet2.add(str);
            }
        }
        Iterator it3 = hashSet2.iterator();
        while (it3.hasNext()) {
            subscribeTableConfigChanges((String) it3.next());
        }
        if (hashSet.isEmpty()) {
            return;
        }
        LOGGER.info("Found tables to clean up cron task scheduler: {}", hashSet);
        Iterator it4 = hashSet.iterator();
        while (it4.hasNext()) {
            cleanUpCronTaskSchedulerForTable((String) it4.next());
        }
    }

    private String getPropertyStorePathForTable(String str) {
        return "/CONFIGS/TABLE/" + str;
    }

    private String getPropertyStorePathForTaskQueue(String str) {
        return String.format(TASK_QUEUE_PATH_PATTERN, str);
    }

    public synchronized void cleanUpCronTaskSchedulerForTable(String str) {
        LOGGER.info("Cleaning up task in scheduler for table {}", str);
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = this._tableTaskSchedulerUpdaterMap.get(str);
        if (tableTaskSchedulerUpdater != null) {
            this._pinotHelixResourceManager.getPropertyStore().unsubscribeDataChanges(getPropertyStorePathForTable(str), tableTaskSchedulerUpdater);
        }
        removeAllTasksFromCronExpressions(str);
        this._tableTaskSchedulerUpdaterMap.remove(str);
    }

    private synchronized void removeAllTasksFromCronExpressions(String str) {
        try {
            for (JobKey jobKey : this._scheduler.getJobKeys(GroupMatcher.anyJobGroup())) {
                if (jobKey.getName().equals(str)) {
                    try {
                        this._scheduler.deleteJob(jobKey);
                        this._controllerMetrics.addValueToTableGauge(getCronJobName(str, jobKey.getGroup()), ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
                    } catch (SchedulerException e) {
                        LOGGER.error("Got exception when deleting the scheduled job - {}", jobKey, e);
                    }
                }
            }
            this._tableTaskTypeToCronExpressionMap.remove(str);
        } catch (SchedulerException e2) {
            LOGGER.error("Got exception when fetching all jobKeys", (Throwable) e2);
        }
    }

    public static String getCronJobName(String str, String str2) {
        return String.format("%s.%s", str, str2);
    }

    public synchronized void subscribeTableConfigChanges(String str) {
        if (this._tableTaskSchedulerUpdaterMap.containsKey(str)) {
            return;
        }
        TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(str, this);
        this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(getPropertyStorePathForTable(str), tableTaskSchedulerUpdater);
        this._tableTaskSchedulerUpdaterMap.put(str, tableTaskSchedulerUpdater);
        try {
            updateCronTaskScheduler(str);
        } catch (Exception e) {
            LOGGER.error("Failed to create cron task in scheduler for table: {}", str, e);
        }
    }

    public synchronized void updateCronTaskScheduler(String str) {
        LOGGER.info("Trying to update task schedule for table: {}", str);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str);
        if (tableConfig == null) {
            LOGGER.info("tableConfig is null, trying to remove all the tasks for table {} if any", str);
            removeAllTasksFromCronExpressions(str);
            return;
        }
        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
        if (taskConfig == null) {
            LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", str);
            removeAllTasksFromCronExpressions(str);
            return;
        }
        Map<String, Map<String, String>> taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
        if (taskTypeConfigsMap == null) {
            LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", str);
            removeAllTasksFromCronExpressions(str);
        } else {
            Map<String, String> taskToCronExpressionMap = getTaskToCronExpressionMap(taskTypeConfigsMap);
            LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
            updateCronTaskScheduler(str, taskToCronExpressionMap);
        }
    }

    private void updateCronTaskScheduler(String str, Map<String, String> map) {
        Map<String, String> map2 = this._tableTaskTypeToCronExpressionMap.get(str);
        if (map2 == null || map2.isEmpty()) {
            for (String str2 : map.keySet()) {
                String str3 = map.get(str2);
                try {
                    scheduleJob(str, str2, str3);
                } catch (SchedulerException e) {
                    LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", str, str2, str3, e);
                }
            }
        } else {
            for (Map.Entry<String, String> entry : map2.entrySet()) {
                String key = entry.getKey();
                String str4 = map.get(key);
                if (str4 == null) {
                    try {
                        this._scheduler.deleteJob(JobKey.jobKey(str, key));
                        this._controllerMetrics.addValueToTableGauge(getCronJobName(str, key), ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, -1L);
                    } catch (SchedulerException e2) {
                        LOGGER.error("Failed to delete scheduled job for table {}, task type {}", str, key, e2);
                    }
                } else if (!entry.getValue().equalsIgnoreCase(str4)) {
                    try {
                        TriggerKey triggerKey = TriggerKey.triggerKey(str, key);
                        this._scheduler.rescheduleJob(triggerKey, TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(CronScheduleBuilder.cronSchedule(str4)).build());
                    } catch (SchedulerException e3) {
                        LOGGER.error("Failed to update scheduled job for table {}, task type {}", str, key, e3);
                    }
                }
            }
            for (Map.Entry<String, String> entry2 : map.entrySet()) {
                String key2 = entry2.getKey();
                if (!map2.containsKey(key2)) {
                    String value = entry2.getValue();
                    try {
                        scheduleJob(str, key2, value);
                    } catch (SchedulerException e4) {
                        LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", str, key2, value, e4);
                    }
                }
            }
        }
        this._tableTaskTypeToCronExpressionMap.put(str, map);
    }

    private void scheduleJob(String str, String str2, String str3) throws SchedulerException {
        boolean z = false;
        try {
            z = this._scheduler.checkExists(JobKey.jobKey(str, str2));
        } catch (SchedulerException e) {
            LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", str, str2, e);
        }
        if (z) {
            return;
        }
        LOGGER.info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", str3, str, str2);
        Trigger build = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(str, str2)).withSchedule(CronScheduleBuilder.cronSchedule(str3)).build();
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put(PINOT_TASK_MANAGER_KEY, (Object) this);
        jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, (Object) this._leadControllerManager);
        try {
            this._scheduler.scheduleJob(JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(str, str2).setJobData(jobDataMap).build(), build);
            this._controllerMetrics.addValueToTableGauge(getCronJobName(str, str2), ControllerGauge.CRON_SCHEDULER_JOB_SCHEDULED, 1L);
            LOGGER.info("Scheduled task for table: {}, task type: {}, next runtime: {}", str, str2, build.getNextFireTime());
        } catch (Exception e2) {
            LOGGER.error("Failed to parse Cron expression - " + str3, (Throwable) e2);
            throw e2;
        }
    }

    private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> map) {
        String str;
        HashMap hashMap = new HashMap();
        for (String str2 : map.keySet()) {
            Map<String, String> map2 = map.get(str2);
            if (map2 != null && map2.containsKey(SCHEDULE_KEY) && (str = map2.get(SCHEDULE_KEY)) != null) {
                hashMap.put(str2, str);
            }
        }
        return hashMap;
    }

    public ClusterInfoAccessor getClusterInfoAccessor() {
        return this._clusterInfoAccessor;
    }

    public TaskGeneratorRegistry getTaskGeneratorRegistry() {
        return this._taskGeneratorRegistry;
    }

    public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
        this._taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
    }

    public synchronized Map<String, String> scheduleTasks() {
        return scheduleTasks(this._pinotHelixResourceManager.getAllTables(), false);
    }

    private synchronized Map<String, String> scheduleTasks(List<String> list, boolean z) {
        this._controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
        HashMap hashMap = new HashMap();
        Iterator<String> it2 = list.iterator();
        while (it2.hasNext()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(it2.next());
            if (tableConfig != null && tableConfig.getTaskConfig() != null) {
                Iterator<String> it3 = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet().iterator();
                while (it3.hasNext()) {
                    ((List) hashMap.computeIfAbsent(it3.next(), str -> {
                        return new ArrayList();
                    })).add(tableConfig);
                }
            }
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            List<TableConfig> list2 = (List) entry.getValue();
            PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(str2);
            if (taskGenerator != null) {
                this._helixTaskResourceManager.ensureTaskQueueExists(str2);
                addTaskTypeMetricsUpdaterIfNeeded(str2);
                hashMap2.put(str2, scheduleTask(taskGenerator, list2, z));
            } else {
                ArrayList arrayList = new ArrayList(list2.size());
                Iterator<TableConfig> it4 = list2.iterator();
                while (it4.hasNext()) {
                    arrayList.add(it4.next().getTableName());
                }
                LOGGER.warn("Task type: {} is not registered, cannot enable it for tables: {}", str2, arrayList);
                hashMap2.put(str2, null);
            }
        }
        return hashMap2;
    }

    @Nullable
    private String scheduleTask(PinotTaskGenerator pinotTaskGenerator, List<TableConfig> list, boolean z) {
        LOGGER.info("Trying to schedule task type: {}, isLeader: {}", pinotTaskGenerator.getTaskType(), Boolean.valueOf(z));
        try {
            List<PinotTaskConfig> generateTasks = pinotTaskGenerator.generateTasks(list);
            Iterator<TableConfig> it2 = list.iterator();
            while (it2.hasNext()) {
                this._taskManagerStatusCache.saveTaskGeneratorInfo(it2.next().getTableName(), pinotTaskGenerator.getTaskType(), taskGeneratorMostRecentRunInfo -> {
                    taskGeneratorMostRecentRunInfo.addSuccessRunTs(System.currentTimeMillis());
                });
            }
            if (!z) {
                pinotTaskGenerator.nonLeaderCleanUp();
            }
            String taskType = pinotTaskGenerator.getTaskType();
            int size = generateTasks.size();
            if (size <= 0) {
                LOGGER.info("No task to schedule for task type: {}", taskType);
                return null;
            }
            LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", Integer.valueOf(size), taskType, generateTasks);
            this._controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, size);
            return this._helixTaskResourceManager.submitTask(generateTasks, pinotTaskGenerator.getTaskTimeoutMs(), pinotTaskGenerator.getNumConcurrentTasksPerInstance());
        } catch (Exception e) {
            StringWriter stringWriter = new StringWriter();
            PrintWriter printWriter = new PrintWriter(stringWriter);
            try {
                e.printStackTrace(printWriter);
                printWriter.close();
                Iterator<TableConfig> it3 = list.iterator();
                while (it3.hasNext()) {
                    this._taskManagerStatusCache.saveTaskGeneratorInfo(it3.next().getTableName(), pinotTaskGenerator.getTaskType(), taskGeneratorMostRecentRunInfo2 -> {
                        taskGeneratorMostRecentRunInfo2.addErrorRunMessage(System.currentTimeMillis(), stringWriter.toString());
                    });
                }
                throw e;
            } catch (Throwable th) {
                try {
                    printWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    @Nullable
    public synchronized Map<String, String> scheduleTasks(String str) {
        return scheduleTasks(Collections.singletonList(str), false);
    }

    @Nullable
    public synchronized String scheduleTask(String str) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(str);
        Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", str);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it2 = this._pinotHelixResourceManager.getAllTables().iterator();
        while (it2.hasNext()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(it2.next());
            if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(str)) {
                arrayList.add(tableConfig);
            }
        }
        this._helixTaskResourceManager.ensureTaskQueueExists(str);
        addTaskTypeMetricsUpdaterIfNeeded(str);
        return scheduleTask(taskGenerator, arrayList, false);
    }

    @Nullable
    public synchronized String scheduleTask(String str, String str2) {
        PinotTaskGenerator taskGenerator = this._taskGeneratorRegistry.getTaskGenerator(str);
        Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", str);
        TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(str2);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", str2);
        Preconditions.checkState(tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig().isTaskTypeEnabled(str), "Table: %s does not have task type: %s enabled", str2, str);
        this._helixTaskResourceManager.ensureTaskQueueExists(str);
        addTaskTypeMetricsUpdaterIfNeeded(str);
        return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false);
    }

    @Override // org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask
    protected void processTables(List<String> list, Properties properties) {
        scheduleTasks(list, true);
    }

    @Override // org.apache.pinot.core.periodictask.BasePeriodicTask
    public void cleanUpTask() {
        LOGGER.info("Cleaning up all task generators");
        Iterator<String> it2 = this._taskGeneratorRegistry.getAllTaskTypes().iterator();
        while (it2.hasNext()) {
            this._taskGeneratorRegistry.getTaskGenerator(it2.next()).nonLeaderCleanUp();
        }
    }

    @Nullable
    public Scheduler getScheduler() {
        return this._scheduler;
    }

    public synchronized void reportMetrics(String str) {
        Iterator<Map.Entry<TaskState, Integer>> it2 = this._taskStateToCountMap.entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().setValue(0);
        }
        if (this._helixTaskResourceManager.getTaskTypes().contains(str)) {
            Iterator<TaskState> it3 = this._helixTaskResourceManager.getTaskStates(str).values().iterator();
            while (it3.hasNext()) {
                this._taskStateToCountMap.merge(it3.next(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            }
        }
        Iterator<Map.Entry<TaskState, Integer>> it4 = this._taskStateToCountMap.entrySet().iterator();
        while (it4.hasNext()) {
            this._controllerMetrics.setValueOfTableGauge(String.format("%s.%s", str, it4.next().getKey()), ControllerGauge.TASK_STATUS, r0.getValue().intValue());
        }
    }

    private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String str) {
        if (this._taskTypeMetricsUpdaterMap.containsKey(str)) {
            return;
        }
        TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(str, this);
        this._pinotHelixResourceManager.getPropertyStore().subscribeDataChanges(getPropertyStorePathForTaskQueue(str), taskTypeMetricsUpdater);
        this._taskTypeMetricsUpdaterMap.put(str, taskTypeMetricsUpdater);
    }
}
