package org.apache.pinot.controller.helix.core.rebalance;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.zookeeper.data.Stat;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.class */
public class RebalanceCheckerTest {
    @Test
    public void testGetRetryDelayInMs() {
        Assert.assertEquals(RebalanceChecker.getRetryDelayInMs(0L, 1), 0L);
        Assert.assertEquals(RebalanceChecker.getRetryDelayInMs(0L, 2), 0L);
        Assert.assertEquals(RebalanceChecker.getRetryDelayInMs(0L, 3), 0L);
        long[] jArr = {1, 30000, 3600000};
        int length = jArr.length;
        for (int i = 0; i < length; i++) {
            long j = jArr[i];
            long retryDelayInMs = RebalanceChecker.getRetryDelayInMs(j, 1);
            Assert.assertTrue(retryDelayInMs >= j && retryDelayInMs < j * 2);
            long retryDelayInMs2 = RebalanceChecker.getRetryDelayInMs(j, 2);
            Assert.assertTrue(retryDelayInMs2 >= j * 2 && retryDelayInMs2 < j * 4);
            long retryDelayInMs3 = RebalanceChecker.getRetryDelayInMs(j, 3);
            Assert.assertTrue(retryDelayInMs3 >= j * 4 && retryDelayInMs3 < j * 8);
        }
    }

    @Test
    public void testGetCandidateJobs() throws Exception {
        HashMap hashMap = new HashMap();
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats = new TableRebalanceProgressStats();
        tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats.setStartTimeMs(1000L);
        hashMap.put("job1", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job1", tableRebalanceProgressStats, TableRebalanceContext.forInitialAttempt("job1", rebalanceConfig)));
        hashMap.put("job1_2", createDummyJobMetadata("table01", "job1", 2, 1100L, RebalanceResult.Status.FAILED));
        hashMap.put("job1_3", createDummyJobMetadata("table01", "job1", 3, 1200L, RebalanceResult.Status.ABORTED));
        hashMap.put("job1_4", createDummyJobMetadata("table01", "job1", 4, 1300L, RebalanceResult.Status.FAILED));
        RebalanceConfig rebalanceConfig2 = new RebalanceConfig();
        rebalanceConfig2.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats2 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats2.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats2.setStartTimeMs(2000L);
        hashMap.put("job2", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job2", tableRebalanceProgressStats2, TableRebalanceContext.forInitialAttempt("job2", rebalanceConfig2)));
        hashMap.put("job2_2", createDummyJobMetadata("table01", "job2", 2, 2100L, RebalanceResult.Status.DONE));
        RebalanceConfig rebalanceConfig3 = new RebalanceConfig();
        rebalanceConfig3.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats3 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats3.setStatus(RebalanceResult.Status.IN_PROGRESS);
        tableRebalanceProgressStats3.setStartTimeMs(3000L);
        Map createJobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job3", tableRebalanceProgressStats3, TableRebalanceContext.forInitialAttempt("job3", rebalanceConfig3));
        createJobMetadata.put("submissionTimeMs", "3000");
        hashMap.put("job3", createJobMetadata);
        TableRebalanceProgressStats tableRebalanceProgressStats4 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats4.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats4.setStartTimeMs(4000L);
        Map createJobMetadata2 = ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job4", tableRebalanceProgressStats4, (TableRebalanceContext) null);
        createJobMetadata2.remove("REBALANCE_CONTEXT");
        hashMap.put("job4", createJobMetadata2);
        Map candidateJobs = RebalanceChecker.getCandidateJobs("table01", hashMap);
        Assert.assertEquals(candidateJobs.size(), 2);
        Assert.assertTrue(candidateJobs.containsKey("job1"));
        Assert.assertTrue(candidateJobs.containsKey("job3"));
        Assert.assertEquals(((Set) candidateJobs.get("job1")).size(), 4);
        Assert.assertEquals(((Set) candidateJobs.get("job3")).size(), 1);
        cancelRebalanceJob((Map) hashMap.get("job1_4"));
        Map candidateJobs2 = RebalanceChecker.getCandidateJobs("table01", hashMap);
        Assert.assertEquals(candidateJobs2.size(), 1);
        Assert.assertTrue(candidateJobs2.containsKey("job3"));
        Assert.assertEquals(((Set) candidateJobs2.get("job3")).size(), 1);
        RebalanceConfig rebalanceConfig4 = new RebalanceConfig();
        rebalanceConfig4.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats5 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats5.setStatus(RebalanceResult.Status.DONE);
        tableRebalanceProgressStats5.setStartTimeMs(5000L);
        hashMap.put("job5", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job5", tableRebalanceProgressStats5, TableRebalanceContext.forInitialAttempt("job5", rebalanceConfig4)));
        Assert.assertEquals(RebalanceChecker.getCandidateJobs("table01", hashMap).size(), 0);
    }

    @Test
    public void testGetLatestJob() {
        HashMap hashMap = new HashMap();
        hashMap.put("job1", ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L), Pair.of(createDummyJobCtx("job1", 2), 20L), Pair.of(createDummyJobCtx("job1", 3), 1020L)));
        hashMap.put("job2", ImmutableSet.of(Pair.of(createDummyJobCtx("job2", 1), 1000L)));
        Pair latestJob = RebalanceChecker.getLatestJob(hashMap);
        Assert.assertNotNull(latestJob);
        Assert.assertEquals(((TableRebalanceContext) latestJob.getLeft()).getJobId(), "job1_3");
        hashMap.put("job1", ImmutableSet.of(Pair.of(createDummyJobCtx("job1", 1), 10L), Pair.of(createDummyJobCtx("job1", 2), 20L), Pair.of(createDummyJobCtx("job1", 3), 1020L), Pair.of(createDummyJobCtx("job1", 4), 2020L)));
        Pair latestJob2 = RebalanceChecker.getLatestJob(hashMap);
        Assert.assertNotNull(latestJob2);
        Assert.assertEquals(((TableRebalanceContext) latestJob2.getLeft()).getJobId(), "job2");
        hashMap.put("job3", ImmutableSet.of(Pair.of(createDummyJobCtx("job3", 1), 3000L)));
        Pair latestJob3 = RebalanceChecker.getLatestJob(hashMap);
        Assert.assertNotNull(latestJob3);
        Assert.assertEquals(((TableRebalanceContext) latestJob3.getLeft()).getJobId(), "job3");
        hashMap.remove("job2");
        hashMap.remove("job3");
        Assert.assertNull(RebalanceChecker.getLatestJob(hashMap));
    }

    @Test
    public void testRetryRebalance() throws Exception {
        LeadControllerManager leadControllerManager = (LeadControllerManager) Mockito.mock(LeadControllerManager.class);
        ControllerMetrics controllerMetrics = (ControllerMetrics) Mockito.mock(ControllerMetrics.class);
        ListeningExecutorService newDirectExecutorService = MoreExecutors.newDirectExecutorService();
        ControllerConf controllerConf = new ControllerConf();
        HashMap hashMap = new HashMap();
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats = new TableRebalanceProgressStats();
        tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats.setStartTimeMs(1000L);
        hashMap.put("job1", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job1", tableRebalanceProgressStats, TableRebalanceContext.forInitialAttempt("job1", rebalanceConfig)));
        hashMap.put("job1_2", createDummyJobMetadata("table01", "job1", 2, 1100L, RebalanceResult.Status.FAILED));
        hashMap.put("job1_3", createDummyJobMetadata("table01", "job1", 3, 1200L, RebalanceResult.Status.FAILED));
        hashMap.put("job1_4", createDummyJobMetadata("table01", "job1", 4, 5300L, RebalanceResult.Status.FAILED));
        RebalanceConfig rebalanceConfig2 = new RebalanceConfig();
        rebalanceConfig2.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats2 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats2.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats2.setStartTimeMs(2000L);
        hashMap.put("job2", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job2", tableRebalanceProgressStats2, TableRebalanceContext.forInitialAttempt("job2", rebalanceConfig2)));
        hashMap.put("job2_2", createDummyJobMetadata("table01", "job2", 2, 2100L, RebalanceResult.Status.DONE));
        RebalanceConfig rebalanceConfig3 = new RebalanceConfig();
        rebalanceConfig3.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats3 = new TableRebalanceProgressStats();
        tableRebalanceProgressStats3.setStatus(RebalanceResult.Status.IN_PROGRESS);
        tableRebalanceProgressStats3.setStartTimeMs(3000L);
        Map createJobMetadata = ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job3", tableRebalanceProgressStats3, TableRebalanceContext.forInitialAttempt("job3", rebalanceConfig3));
        createJobMetadata.put("submissionTimeMs", "3000");
        hashMap.put("job3", createJobMetadata);
        TableConfig tableConfig = (TableConfig) Mockito.mock(TableConfig.class);
        PinotHelixResourceManager pinotHelixResourceManager = (PinotHelixResourceManager) Mockito.mock(PinotHelixResourceManager.class);
        Mockito.when(pinotHelixResourceManager.getTableConfig("table01")).thenReturn(tableConfig);
        Mockito.when(pinotHelixResourceManager.getAllJobs((Set) ArgumentMatchers.any(), (Predicate) ArgumentMatchers.any())).thenReturn(hashMap);
        new RebalanceChecker(pinotHelixResourceManager, leadControllerManager, controllerConf, controllerMetrics, newDirectExecutorService).retryRebalanceTable("table01", hashMap);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
        ((PinotHelixResourceManager) Mockito.verify(pinotHelixResourceManager, Mockito.times(1))).rebalanceTable((String) ArgumentMatchers.eq("table01"), (TableConfig) ArgumentMatchers.any(), ArgumentMatchers.anyString(), (RebalanceConfig) ArgumentMatchers.any(), (ZkBasedTableRebalanceObserver) forClass.capture());
        TableRebalanceContext tableRebalanceContext = ((ZkBasedTableRebalanceObserver) forClass.getValue()).getTableRebalanceContext();
        Assert.assertEquals(tableRebalanceContext.getOriginalJobId(), "job3");
        Assert.assertEquals(tableRebalanceContext.getAttemptId(), 2);
    }

    @Test
    public void testRetryRebalanceWithBackoff() throws Exception {
        LeadControllerManager leadControllerManager = (LeadControllerManager) Mockito.mock(LeadControllerManager.class);
        ControllerMetrics controllerMetrics = (ControllerMetrics) Mockito.mock(ControllerMetrics.class);
        ListeningExecutorService newDirectExecutorService = MoreExecutors.newDirectExecutorService();
        ControllerConf controllerConf = new ControllerConf();
        HashMap hashMap = new HashMap();
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setMaxAttempts(4);
        long currentTimeMillis = System.currentTimeMillis();
        TableRebalanceProgressStats tableRebalanceProgressStats = new TableRebalanceProgressStats();
        tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
        tableRebalanceProgressStats.setStartTimeMs(currentTimeMillis);
        TableRebalanceContext forInitialAttempt = TableRebalanceContext.forInitialAttempt("job1", rebalanceConfig);
        hashMap.put("job1", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job1", tableRebalanceProgressStats, forInitialAttempt));
        PinotHelixResourceManager pinotHelixResourceManager = (PinotHelixResourceManager) Mockito.mock(PinotHelixResourceManager.class);
        Mockito.when(pinotHelixResourceManager.getTableConfig("table01")).thenReturn((TableConfig) Mockito.mock(TableConfig.class));
        RebalanceChecker rebalanceChecker = new RebalanceChecker(pinotHelixResourceManager, leadControllerManager, controllerConf, controllerMetrics, newDirectExecutorService);
        rebalanceChecker.retryRebalanceTable("table01", hashMap);
        ((PinotHelixResourceManager) Mockito.verify(pinotHelixResourceManager, Mockito.times(0))).rebalanceTable((String) ArgumentMatchers.eq("table01"), (TableConfig) ArgumentMatchers.any(), ArgumentMatchers.anyString(), (RebalanceConfig) ArgumentMatchers.any(), (ZkBasedTableRebalanceObserver) ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class).capture());
        rebalanceConfig.setRetryInitialDelayInMs(0L);
        hashMap.put("job1", ZkBasedTableRebalanceObserver.createJobMetadata("table01", "job1", tableRebalanceProgressStats, forInitialAttempt));
        rebalanceChecker.retryRebalanceTable("table01", hashMap);
        ((PinotHelixResourceManager) Mockito.verify(pinotHelixResourceManager, Mockito.times(1))).rebalanceTable((String) ArgumentMatchers.eq("table01"), (TableConfig) ArgumentMatchers.any(), ArgumentMatchers.anyString(), (RebalanceConfig) ArgumentMatchers.any(), (ZkBasedTableRebalanceObserver) ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class).capture());
    }

    @Test
    public void testAddUpdateControllerJobsForTable() {
        ControllerConf controllerConf = new ControllerConf();
        controllerConf.setZkStr("localhost:2181");
        controllerConf.setHelixClusterName("cluster01");
        PinotHelixResourceManager pinotHelixResourceManager = new PinotHelixResourceManager(controllerConf);
        HelixManager helixManager = (HelixManager) Mockito.mock(HelixManager.class);
        ZkHelixPropertyStore zkHelixPropertyStore = (ZkHelixPropertyStore) Mockito.mock(ZkHelixPropertyStore.class);
        String constructPropertyStorePathForControllerJob = ZKMetadataProvider.constructPropertyStorePathForControllerJob("TABLE_REBALANCE");
        ZNRecord zNRecord = new ZNRecord("jobs");
        Mockito.when(zkHelixPropertyStore.get((String) ArgumentMatchers.eq(constructPropertyStorePathForControllerJob), (Stat) ArgumentMatchers.any(), ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenReturn(zNRecord);
        Mockito.when(helixManager.getClusterManagmentTool()).thenReturn((HelixAdmin) Mockito.mock(HelixAdmin.class));
        Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(zkHelixPropertyStore);
        Mockito.when(helixManager.getHelixDataAccessor()).thenReturn((HelixDataAccessor) Mockito.mock(HelixDataAccessor.class));
        pinotHelixResourceManager.start(helixManager, (ControllerMetrics) null);
        pinotHelixResourceManager.addControllerJobToZK("job1", ImmutableMap.of("jobId", "job1", "submissionTimeMs", "1000", "tableName", "table01"), "TABLE_REBALANCE", map -> {
            return true;
        });
        pinotHelixResourceManager.addControllerJobToZK("job2", ImmutableMap.of("jobId", "job2", "submissionTimeMs", "2000", "tableName", "table01"), "TABLE_REBALANCE", map2 -> {
            return false;
        });
        pinotHelixResourceManager.addControllerJobToZK("job3", ImmutableMap.of("jobId", "job3", "submissionTimeMs", "3000", "tableName", "table02"), "TABLE_REBALANCE", map3 -> {
            return true;
        });
        pinotHelixResourceManager.addControllerJobToZK("job4", ImmutableMap.of("jobId", "job4", "submissionTimeMs", "4000", "tableName", "table02"), "TABLE_REBALANCE", map4 -> {
            return true;
        });
        Map mapFields = zNRecord.getMapFields();
        Assert.assertEquals(mapFields.size(), 3);
        Assert.assertTrue(mapFields.containsKey("job1"));
        Assert.assertTrue(mapFields.containsKey("job3"));
        Assert.assertTrue(mapFields.containsKey("job4"));
        HashSet hashSet = new HashSet();
        pinotHelixResourceManager.updateJobsForTable("table01", "TABLE_REBALANCE", map5 -> {
            hashSet.add((String) map5.get("jobId"));
        });
        Assert.assertEquals(hashSet.size(), 1);
        Assert.assertTrue(hashSet.contains("job1"));
        HashSet hashSet2 = new HashSet();
        pinotHelixResourceManager.updateJobsForTable("table02", "TABLE_REBALANCE", map6 -> {
            hashSet2.add((String) map6.get("jobId"));
        });
        Assert.assertEquals(hashSet2.size(), 2);
        Assert.assertTrue(hashSet2.contains("job3"));
        Assert.assertTrue(hashSet2.contains("job4"));
    }

    private static TableRebalanceContext createDummyJobCtx(String str, int i) {
        TableRebalanceContext tableRebalanceContext = new TableRebalanceContext();
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setMaxAttempts(4);
        tableRebalanceContext.setJobId(str);
        if (i > 1) {
            tableRebalanceContext.setJobId(str + "_" + i);
        }
        tableRebalanceContext.setOriginalJobId(str);
        tableRebalanceContext.setConfig(rebalanceConfig);
        tableRebalanceContext.setAttemptId(i);
        return tableRebalanceContext;
    }

    private static Map<String, String> createDummyJobMetadata(String str, String str2, int i, long j, RebalanceResult.Status status) {
        RebalanceConfig rebalanceConfig = new RebalanceConfig();
        rebalanceConfig.setMaxAttempts(4);
        TableRebalanceProgressStats tableRebalanceProgressStats = new TableRebalanceProgressStats();
        tableRebalanceProgressStats.setStatus(status);
        tableRebalanceProgressStats.setStartTimeMs(j);
        return ZkBasedTableRebalanceObserver.createJobMetadata(str, str2 + "_" + i, tableRebalanceProgressStats, TableRebalanceContext.forRetry(str2, rebalanceConfig, i));
    }

    private static void cancelRebalanceJob(Map<String, String> map) throws JsonProcessingException {
        TableRebalanceProgressStats tableRebalanceProgressStats = (TableRebalanceProgressStats) JsonUtils.stringToObject(map.get("REBALANCE_PROGRESS_STATS"), TableRebalanceProgressStats.class);
        tableRebalanceProgressStats.setStatus(RebalanceResult.Status.CANCELLED);
        map.put("REBALANCE_PROGRESS_STATS", JsonUtils.objectToString(tableRebalanceProgressStats));
    }
}
