package org.apache.pinot.integration.tests;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.MetricValueUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.class */
public class SimpleMinionClusterIntegrationTest extends ClusterTest {
    public static final String TASK_TYPE = "TestTask";
    public static final String TABLE_NAME_1 = "testTable1";
    public static final String TABLE_NAME_2 = "testTable2";
    public static final String TABLE_NAME_3 = "testTable3";
    public static final int NUM_TASKS = 2;
    public static final int NUM_CONFIGS = 4;
    public static final AtomicBoolean HOLD = new AtomicBoolean();
    public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
    public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean();
    public static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean();
    public static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean();
    private static final long STATE_TRANSITION_TIMEOUT_MS = 60000;
    private static final long ZK_CALLBACK_TIMEOUT_MS = 30000;
    private PinotHelixTaskResourceManager _helixTaskResourceManager;
    private PinotTaskManager _taskManager;

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        startBroker();
        startServer();
        startMinion();
        PinotHelixResourceManager helixResourceManager = this._controllerStarter.getHelixResourceManager();
        HashMap hashMap = new HashMap();
        hashMap.put("TestTask.timeoutMs", Long.toString(600000L));
        hashMap.put("TestTask.maxAttemptsPerTask", "2");
        helixResourceManager.getHelixAdmin().setConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(helixResourceManager.getHelixClusterName()).build(), hashMap);
        addDummySchema(TABLE_NAME_1);
        addDummySchema(TABLE_NAME_2);
        addDummySchema(TABLE_NAME_3);
        TableTaskConfig tableTaskConfig = new TableTaskConfig(Collections.singletonMap(TASK_TYPE, Collections.emptyMap()));
        addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setTaskConfig(tableTaskConfig).build());
        addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setTaskConfig(tableTaskConfig).build());
        addTableConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_3).build());
        this._helixTaskResourceManager = this._controllerStarter.getHelixTaskResourceManager();
        this._taskManager = this._controllerStarter.getTaskManager();
    }

    @Test
    public void testTaskTimeout() {
        PinotTaskGenerator taskGenerator = this._taskManager.getTaskGeneratorRegistry().getTaskGenerator(TASK_TYPE);
        Assert.assertNotNull(taskGenerator);
        Assert.assertEquals(taskGenerator.getTaskTimeoutMs(), 600000L);
    }

    @Test
    public void testTaskMaxAttempts() {
        PinotTaskGenerator taskGenerator = this._taskManager.getTaskGeneratorRegistry().getTaskGenerator(TASK_TYPE);
        Assert.assertNotNull(taskGenerator);
        Assert.assertEquals(taskGenerator.getMaxAttemptsPerTask(), 2);
    }

    private void verifyTaskCount(String str, int i, int i2, int i3, int i4) {
        TestUtils.waitForCondition(r9 -> {
            PinotHelixTaskResourceManager.TaskCount taskCount = this._helixTaskResourceManager.getTaskCount(str);
            return Boolean.valueOf(taskCount.getError() == i && taskCount.getWaiting() == i2 && taskCount.getRunning() == i3 && taskCount.getTotal() == i4);
        }, 10000L, "Failed to reach expected task count");
    }

    @Test
    public void testStopResumeDeleteTaskQueue() {
        HOLD.set(true);
        Assert.assertEquals(this._helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
        List scheduledTaskNames = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(new TaskSchedulingContext()).get(TASK_TYPE)).getScheduledTaskNames();
        Assert.assertNotNull(scheduledTaskNames);
        Assert.assertEquals(scheduledTaskNames.size(), 1);
        Assert.assertTrue(this._helixTaskResourceManager.getTaskQueues().contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
        Assert.assertTrue(this._helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(scheduledTaskNames.get(0)));
        verifyTaskCount((String) scheduledTaskNames.get(0), 0, 1, 1, 2);
        List scheduledTaskNames2 = ((TaskSchedulingInfo) this._taskManager.scheduleTasks(new TaskSchedulingContext()).get(TASK_TYPE)).getScheduledTaskNames();
        Assert.assertNotNull(scheduledTaskNames2);
        Assert.assertEquals(scheduledTaskNames2.size(), 1);
        Assert.assertTrue(this._helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(scheduledTaskNames2.get(0)));
        verifyTaskCount((String) scheduledTaskNames2.get(0), 0, 2, 0, 2);
        MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext(), this._taskManager);
        TestUtils.waitForCondition(r4 -> {
            Collection values = this._helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
            Assert.assertEquals(values.size(), 2);
            Iterator it = values.iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.IN_PROGRESS) {
                    return false;
                }
            }
            Assert.assertFalse(TASK_SUCCESS_NOTIFIED.get());
            Assert.assertFalse(TASK_CANCELLED_NOTIFIED.get());
            Assert.assertFalse(TASK_ERROR_NOTIFIED.get());
            return Boolean.valueOf(TASK_START_NOTIFIED.get());
        }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
        ControllerMetrics controllerMetrics = this._controllerStarter.getControllerMetrics();
        String str = "TestTask." + String.valueOf(TaskState.IN_PROGRESS);
        String str2 = "TestTask." + String.valueOf(TaskState.STOPPED);
        String str3 = "TestTask." + String.valueOf(TaskState.COMPLETED);
        TestUtils.waitForCondition(r9 -> {
            return Boolean.valueOf(MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str, ControllerGauge.TASK_STATUS) == 2 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str2, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str3, ControllerGauge.TASK_STATUS) == 0);
        }, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
        this._helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
        TestUtils.waitForCondition(r42 -> {
            Collection values = this._helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
            Assert.assertEquals(values.size(), 2);
            Iterator it = values.iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.STOPPED) {
                    return false;
                }
            }
            Assert.assertTrue(TASK_START_NOTIFIED.get());
            Assert.assertFalse(TASK_SUCCESS_NOTIFIED.get());
            Assert.assertFalse(TASK_ERROR_NOTIFIED.get());
            return Boolean.valueOf(TASK_CANCELLED_NOTIFIED.get());
        }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
        TestUtils.waitForCondition(r92 -> {
            return Boolean.valueOf(MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str2, ControllerGauge.TASK_STATUS) == 2 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str3, ControllerGauge.TASK_STATUS) == 0);
        }, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
        Assert.assertTrue(this._helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(scheduledTaskNames.get(0)));
        this._helixTaskResourceManager.deleteTask((String) scheduledTaskNames.get(0), false);
        this._helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
        HOLD.set(false);
        TestUtils.waitForCondition(r6 -> {
            Collection values = this._helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
            Iterator it = values.iterator();
            while (it.hasNext()) {
                if (((TaskState) it.next()) != TaskState.COMPLETED) {
                    return false;
                }
            }
            Assert.assertFalse(this._helixTaskResourceManager.getTaskStates(TASK_TYPE).containsKey(scheduledTaskNames.get(0)));
            Assert.assertEquals(values.size(), 1);
            Assert.assertTrue(TASK_START_NOTIFIED.get());
            Assert.assertTrue(TASK_SUCCESS_NOTIFIED.get());
            Assert.assertTrue(TASK_CANCELLED_NOTIFIED.get());
            Assert.assertFalse(TASK_ERROR_NOTIFIED.get());
            return true;
        }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
        TestUtils.waitForCondition(r93 -> {
            return Boolean.valueOf(MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str2, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str3, ControllerGauge.TASK_STATUS) == 1);
        }, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
        this._helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
        TestUtils.waitForCondition(r43 -> {
            return Boolean.valueOf(!this._helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE));
        }, STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
        TestUtils.waitForCondition(r94 -> {
            return Boolean.valueOf(MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str2, ControllerGauge.TASK_STATUS) == 0 && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, str3, ControllerGauge.TASK_STATUS) == 0);
        }, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
    }

    @AfterClass
    public void tearDown() throws Exception {
        dropOfflineTable(TABLE_NAME_1);
        dropOfflineTable(TABLE_NAME_2);
        dropOfflineTable(TABLE_NAME_3);
        stopMinion();
        stopServer();
        stopBroker();
        stopController();
        stopZk();
    }
}
