package org.apache.pinot.broker.broker;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.ServiceUnavailableException;
import org.apache.pinot.broker.broker.BrokerManagedAsyncExecutorProvider;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/broker/BrokerManagedAsyncExecutorProviderTest.class */
public class BrokerManagedAsyncExecutorProviderTest {
    public static BrokerMetrics _brokerMetrics;

    @BeforeClass
    public void setUp() {
        _brokerMetrics = new BrokerMetrics("pinot.broker.", PinotMetricUtils.getPinotMetricsRegistry(new PinotConfiguration()), true, Collections.emptyList());
    }

    @Test
    public void testExecutorService() throws InterruptedException, ExecutionException {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) new BrokerManagedAsyncExecutorProvider(2, 2, 2, _brokerMetrics).getExecutorService();
        Integer num = (Integer) threadPoolExecutor.submit(() -> {
            return 2;
        }).get();
        Assert.assertNotNull(num);
        Assert.assertEquals(num.intValue(), 2);
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(1L, TimeUnit.SECONDS);
    }

    @Test
    public void testGet() throws InterruptedException {
        ExecutorService executorService = new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics).getExecutorService();
        Assert.assertNotNull(executorService);
        Assert.assertTrue(executorService instanceof ThreadPoolExecutor);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
        Assert.assertEquals(1, threadPoolExecutor.getCorePoolSize());
        Assert.assertEquals(1, threadPoolExecutor.getMaximumPoolSize());
        BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
        Assert.assertNotNull(queue);
        Assert.assertTrue(queue instanceof ArrayBlockingQueue);
        Assert.assertEquals(0, queue.size());
        Assert.assertEquals(1, queue.remainingCapacity());
        RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
        Assert.assertNotNull(rejectedExecutionHandler);
        Assert.assertTrue(rejectedExecutionHandler instanceof BrokerManagedAsyncExecutorProvider.BrokerThreadPoolRejectExecutionHandler);
        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 1; i++) {
            threadPoolExecutor.execute(() -> {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        Assert.assertEquals(atomicInteger.get(), 1);
    }

    @Test(expectedExceptions = {ServiceUnavailableException.class})
    public void testRejectHandler() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) new BrokerManagedAsyncExecutorProvider(1, 1, 1, _brokerMetrics).getExecutorService();
        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            threadPoolExecutor.execute(() -> {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }
}
