package org.apache.pinot.core.query.scheduler;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ResultsBlockStreamer;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.class */
public class PrioritySchedulerTest {
    private static CyclicBarrier _startupBarrier;
    private static CyclicBarrier _validationBarrier;
    private static final ServerMetrics METRICS = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
    private static boolean _useBarrier = false;
    private static CountDownLatch _numQueries = new CountDownLatch(1);

    /* loaded from: input_file:org/apache/pinot/core/query/scheduler/PrioritySchedulerTest$TestPriorityScheduler.class */
    static class TestPriorityScheduler extends PriorityScheduler {
        static TestSchedulerGroupFactory _groupFactory;
        static LongAccumulator _latestQueryTime;

        public TestPriorityScheduler(PinotConfiguration pinotConfiguration, ResourceManager resourceManager, QueryExecutor queryExecutor, SchedulerPriorityQueue schedulerPriorityQueue, ServerMetrics serverMetrics, LongAccumulator longAccumulator) {
            super(pinotConfiguration, resourceManager, queryExecutor, schedulerPriorityQueue, serverMetrics, longAccumulator);
        }

        public static TestPriorityScheduler create(PinotConfiguration pinotConfiguration) {
            PolicyBasedResourceManager policyBasedResourceManager = new PolicyBasedResourceManager(pinotConfiguration);
            TestQueryExecutor testQueryExecutor = new TestQueryExecutor();
            _groupFactory = new TestSchedulerGroupFactory();
            MultiLevelPriorityQueue multiLevelPriorityQueue = new MultiLevelPriorityQueue(pinotConfiguration, policyBasedResourceManager, _groupFactory, new TableBasedGroupMapper());
            _latestQueryTime = new LongAccumulator(Long::max, 0L);
            return new TestPriorityScheduler(pinotConfiguration, policyBasedResourceManager, testQueryExecutor, multiLevelPriorityQueue, PrioritySchedulerTest.METRICS, _latestQueryTime);
        }

        public static TestPriorityScheduler create() {
            return create(new PinotConfiguration());
        }

        ResourceManager getResourceManager() {
            return this._resourceManager;
        }

        public String name() {
            return "TestScheduler";
        }

        public Semaphore getRunningQueriesSemaphore() {
            return this._runningQueriesSemaphore;
        }

        Thread getSchedulerThread() {
            return this._scheduler;
        }

        SchedulerPriorityQueue getQueue() {
            return this._queryQueue;
        }

        public long getLatestQueryTime() {
            return _latestQueryTime.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pinot/core/query/scheduler/PrioritySchedulerTest$TestQueryExecutor.class */
    public static class TestQueryExecutor implements QueryExecutor {
        TestQueryExecutor() {
        }

        public void init(PinotConfiguration pinotConfiguration, InstanceDataManager instanceDataManager, ServerMetrics serverMetrics) {
        }

        public void start() {
        }

        public void shutDown() {
        }

        public InstanceResponseBlock execute(ServerQueryRequest serverQueryRequest, ExecutorService executorService, @Nullable ResultsBlockStreamer resultsBlockStreamer) {
            if (PrioritySchedulerTest._useBarrier) {
                try {
                    PrioritySchedulerTest._startupBarrier.await();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock();
            instanceResponseBlock.addMetadata(DataTable.MetadataKey.TABLE.getName(), serverQueryRequest.getTableNameWithType());
            if (PrioritySchedulerTest._useBarrier) {
                try {
                    PrioritySchedulerTest._validationBarrier.await();
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
            PrioritySchedulerTest._numQueries.countDown();
            return instanceResponseBlock;
        }
    }

    @AfterMethod
    public void afterMethod() {
        _useBarrier = false;
        _startupBarrier = null;
        _validationBarrier = null;
        _numQueries = new CountDownLatch(1);
    }

    @Test
    public void testStartStop() throws InterruptedException {
        TestPriorityScheduler create = TestPriorityScheduler.create();
        create.start();
        Thread.sleep(100L);
        create.stop();
        long wakeupTimeMicros = create.getQueue().getWakeupTimeMicros();
        Thread.sleep(wakeupTimeMicros >= 1000 ? (wakeupTimeMicros / 1000) + 10 : 10L);
        Assert.assertFalse(create._scheduler.isAlive());
    }

    @Test
    public void testStartStopQueries() throws ExecutionException, InterruptedException, IOException {
        TestPriorityScheduler create = TestPriorityScheduler.create();
        create.start();
        PropertiesConfiguration propertiesConfiguration = new PropertiesConfiguration();
        propertiesConfiguration.setProperty("table_threads_hard_limit_pct", 5);
        propertiesConfiguration.setProperty("max_pending_per_group", 5);
        ArrayList arrayList = new ArrayList();
        arrayList.add(create.submit(TestHelper.createServerQueryRequest("1", METRICS)));
        TestSchedulerGroup testSchedulerGroup = TestPriorityScheduler._groupFactory._groupMap.get("1");
        testSchedulerGroup.addReservedThreads(10);
        testSchedulerGroup.addLast(TestHelper.createQueryRequest("1", METRICS));
        arrayList.add(create.submit(TestHelper.createServerQueryRequest("1", METRICS)));
        create.stop();
        long wakeupTimeMicros = create.getQueue().getWakeupTimeMicros();
        Thread.sleep(wakeupTimeMicros >= 1000 ? (wakeupTimeMicros / 1000) + 10 : 10L);
        int i = 0;
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            i += DataTableFactory.getDataTable((byte[]) ((ListenableFuture) it.next()).get()).getExceptions().containsKey(Integer.valueOf(QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode())) ? 1 : 0;
        }
        Assert.assertTrue(i > 0);
    }

    @Test
    public void testOneQuery() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
        HashMap hashMap = new HashMap();
        hashMap.put("threads_per_query_pct", 50);
        hashMap.put("table_threads_hard_limit_pct", 40);
        hashMap.put("table_threads_soft_limit_pct", 20);
        _useBarrier = true;
        _startupBarrier = new CyclicBarrier(2);
        _validationBarrier = new CyclicBarrier(2);
        TestPriorityScheduler create = TestPriorityScheduler.create(new PinotConfiguration(hashMap));
        int availablePermits = create.getRunningQueriesSemaphore().availablePermits();
        create.start();
        ListenableFuture submit = create.submit(TestHelper.createServerQueryRequest("1", METRICS));
        _startupBarrier.await();
        TestSchedulerGroup testSchedulerGroup = TestPriorityScheduler._groupFactory._groupMap.get("1");
        Assert.assertEquals(testSchedulerGroup.numRunning(), 1);
        Assert.assertEquals(testSchedulerGroup.getThreadsInUse(), 1);
        Assert.assertTrue(testSchedulerGroup.totalReservedThreads() <= 2);
        _validationBarrier.await();
        Assert.assertEquals((String) DataTableFactory.getDataTable((byte[]) submit.get()).getMetadata().get(DataTable.MetadataKey.TABLE.getName()), "1");
        Assert.assertEquals(testSchedulerGroup.numPending(), 0);
        Assert.assertEquals(testSchedulerGroup.getThreadsInUse(), 0);
        Assert.assertEquals(testSchedulerGroup.totalReservedThreads(), 0);
        Assert.assertEquals(create.getRunningQueriesSemaphore().availablePermits(), availablePermits - 1);
        Assert.assertTrue(create.getLatestQueryTime() > 0 && create.getLatestQueryTime() <= System.currentTimeMillis());
        create.stop();
    }

    @Test
    public void testMultiThreaded() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("query_worker_threads", 60);
        hashMap.put("query_runner_threads", 20);
        hashMap.put("threads_per_query_pct", 50);
        hashMap.put("table_threads_hard_limit_pct", 60);
        hashMap.put("table_threads_soft_limit_pct", 40);
        hashMap.put("max_pending_per_group", 10);
        final TestPriorityScheduler create = TestPriorityScheduler.create(new PinotConfiguration(hashMap));
        create.start();
        final Random random = new Random();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        _numQueries = new CountDownLatch(30);
        for (int i = 0; i < 3; i++) {
            final int i2 = i;
            new Thread(new Runnable() { // from class: org.apache.pinot.core.query.scheduler.PrioritySchedulerTest.1
                @Override // java.lang.Runnable
                public void run() {
                    for (int i3 = 0; i3 < 10; i3++) {
                        concurrentLinkedQueue.add(create.submit(TestHelper.createServerQueryRequest(Integer.toString(i2), PrioritySchedulerTest.METRICS)));
                        Uninterruptibles.sleepUninterruptibly(random.nextInt(100), TimeUnit.MILLISECONDS);
                    }
                }
            }).start();
        }
        _numQueries.await();
        create.stop();
    }

    @Test(enabled = false)
    public void testOutOfCapacityResponse() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("table_threads_hard_limit_pct", 5);
        hashMap.put("max_pending_per_group", 1);
        TestPriorityScheduler create = TestPriorityScheduler.create(new PinotConfiguration(hashMap));
        create.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add(create.submit(TestHelper.createServerQueryRequest("1", METRICS)));
        TestSchedulerGroup testSchedulerGroup = TestPriorityScheduler._groupFactory._groupMap.get("1");
        testSchedulerGroup.addReservedThreads(10);
        testSchedulerGroup.addLast(TestHelper.createQueryRequest("1", METRICS));
        arrayList.add(create.submit(TestHelper.createServerQueryRequest("1", METRICS)));
        Assert.assertTrue(DataTableFactory.getDataTable((byte[]) ((ListenableFuture) arrayList.get(1)).get()).getMetadata().containsKey("Exception" + QueryException.SERVER_OUT_OF_CAPACITY_ERROR.getErrorCode()));
        create.stop();
    }

    @Test
    public void testSubmitBeforeRunning() throws ExecutionException, InterruptedException, IOException {
        TestPriorityScheduler create = TestPriorityScheduler.create();
        DataTable dataTable = DataTableFactory.getDataTable((byte[]) create.submit(TestHelper.createServerQueryRequest("1", METRICS)).get());
        Assert.assertTrue(dataTable.getExceptions().containsKey(Integer.valueOf(QueryException.SERVER_SCHEDULER_DOWN_ERROR.getErrorCode())));
        Assert.assertFalse(dataTable.getMetadata().containsKey(DataTable.MetadataKey.TABLE.getName()));
        create.stop();
    }
}
