package org.apache.pinot.controller.helix.core.periodictask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.class */
public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ControllerPeriodicTask.class);
    protected final PinotHelixResourceManager _pinotHelixResourceManager;
    protected final LeadControllerManager _leadControllerManager;
    protected final ControllerMetrics _controllerMetrics;
    protected Set<String> _prevLeaderOfTables;

    public ControllerPeriodicTask(String str, long j, long j2, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
        super(str, j, j2);
        this._prevLeaderOfTables = new HashSet();
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._leadControllerManager = leadControllerManager;
        this._controllerMetrics = controllerMetrics;
    }

    @Override // org.apache.pinot.core.periodictask.BasePeriodicTask
    protected final void runTask(Properties properties) {
        this._controllerMetrics.addMeteredTableValue(this._taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN, 1L);
        try {
            String str = (String) properties.get(PeriodicTask.PROPERTY_KEY_TABLE_NAME);
            Stream<String> stream = (str == null ? this._pinotHelixResourceManager.getAllTables() : Collections.singletonList(str)).stream();
            LeadControllerManager leadControllerManager = this._leadControllerManager;
            Objects.requireNonNull(leadControllerManager);
            Set<String> set = (Set) stream.filter(leadControllerManager::isLeaderForTable).collect(Collectors.toSet());
            if (!set.isEmpty()) {
                processTables(new ArrayList(set), properties);
            }
            Sets.SetView difference = Sets.difference(this._prevLeaderOfTables, set);
            if (!difference.isEmpty()) {
                nonLeaderCleanup(new ArrayList(difference));
            }
            this._prevLeaderOfTables = set;
        } catch (Exception e) {
            LOGGER.error("Caught exception while running task: {}", this._taskName, e);
            this._controllerMetrics.addMeteredTableValue(this._taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
        }
    }

    public final ControllerMetrics getControllerMetrics() {
        return this._controllerMetrics;
    }

    protected void processTables(List<String> list, Properties properties) {
        int size = list.size();
        LOGGER.info("Processing {} tables in task: {}", Integer.valueOf(size), this._taskName);
        C preprocess = preprocess(properties);
        int i = 0;
        Iterator<String> it2 = list.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            String next = it2.next();
            if (!isStarted()) {
                LOGGER.info("Task: {} is stopped, early terminate the task", this._taskName);
                break;
            }
            try {
                processTable(next, preprocess);
            } catch (Exception e) {
                LOGGER.error("Caught exception while processing table: {} in task: {}", next, this._taskName, e);
                this._controllerMetrics.addMeteredTableValue(next + "." + this._taskName, ControllerMeter.PERIODIC_TASK_ERROR, 1L);
            }
            i++;
        }
        postprocess(preprocess);
        this._controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, this._taskName, i);
        LOGGER.info("Finish processing {}/{} tables in task: {}", Integer.valueOf(i), Integer.valueOf(size), this._taskName);
    }

    protected C preprocess(Properties properties) {
        return null;
    }

    protected void processTable(String str, C c) {
        processTable(str);
    }

    protected void processTable(String str) {
    }

    protected void postprocess(C c) {
        postprocess();
    }

    protected void postprocess() {
    }

    protected void nonLeaderCleanup(List<String> list) {
    }
}
