package org.apache.pinot.core.accounting;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
import org.apache.pinot.core.accounting.utils.RunnerWorkerThreadOffsetProvider;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.Tracing;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/accounting/ResourceManagerAccountingTest.class */
public class ResourceManagerAccountingTest {
    public static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerAccountingTest.class);

    @Test
    public void testThreadIDProvider() throws Exception {
        ResourceManager resourceManager = getResourceManager(2, 5, 1, 3, Collections.emptyMap());
        Future[] futureArr = new Future[2001];
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        RunnerWorkerThreadOffsetProvider runnerWorkerThreadOffsetProvider = new RunnerWorkerThreadOffsetProvider();
        for (int i = 0; i < 1000; i++) {
            int i2 = i;
            futureArr[i + 1000] = resourceManager.getQueryWorkers().submit(() -> {
                newKeySet.add(Integer.valueOf(runnerWorkerThreadOffsetProvider.get()));
                futureArr[2000].cancel(true);
            });
            futureArr[i] = resourceManager.getQueryRunners().submit(() -> {
                newKeySet.add(Integer.valueOf(runnerWorkerThreadOffsetProvider.get()));
                futureArr[2500 - i2] = null;
            });
        }
        for (int i3 = 0; i3 < 2000; i3++) {
            try {
                futureArr[i3].get();
            } catch (Exception e) {
            }
        }
        Assert.assertEquals(newKeySet.size(), 7);
        Assert.assertTrue(newKeySet.contains(0));
        Assert.assertTrue(newKeySet.contains(1));
        Assert.assertTrue(newKeySet.contains(2));
        Assert.assertTrue(newKeySet.contains(3));
        Assert.assertTrue(newKeySet.contains(4));
        Assert.assertTrue(newKeySet.contains(5));
        Assert.assertTrue(newKeySet.contains(6));
    }

    public void testCPUtimeProvider() throws Exception {
        LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
        LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
        ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
        ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
        HashMap hashMap = new HashMap();
        ServerMetrics.register((ServerMetrics) Mockito.mock(ServerMetrics.class));
        hashMap.put("accounting.oom.alarming.usage.ratio", Float.valueOf(0.0f));
        hashMap.put("accounting.factory.name", "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
        hashMap.put("accounting.enable.thread.memory.sampling", false);
        hashMap.put("accounting.enable.thread.cpu.sampling", true);
        ResourceManager resourceManager = getResourceManager(20, 40, 1, 1, hashMap);
        Future[] futureArr = new Future[2000];
        AtomicInteger atomicInteger = new AtomicInteger();
        for (int i = 0; i < 30; i++) {
            int i2 = i;
            resourceManager.getQueryRunners().submit(() -> {
                Tracing.ThreadAccountantOps.setupRunner("q" + i2);
                Thread.currentThread();
                CountDownLatch countDownLatch = new CountDownLatch(10);
                ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
                for (int i3 = 0; i3 < 10; i3++) {
                    int i4 = i3;
                    resourceManager.getQueryWorkers().submit(() -> {
                        Tracing.ThreadAccountantOps.setupWorker(i4, new ThreadResourceUsageProvider(), threadExecutionContext);
                        for (int i5 = 0; i5 < (i4 + 1) * 10; i5++) {
                            Tracing.ThreadAccountantOps.sample();
                            for (int i6 = 0; i6 < 1000; i6++) {
                                atomicInteger.getAndAccumulate(i6 % 178123, Integer::sum);
                            }
                            try {
                                Thread.sleep(200L);
                            } catch (InterruptedException e) {
                            }
                        }
                        Tracing.ThreadAccountantOps.clear();
                        countDownLatch.countDown();
                    });
                }
                try {
                    countDownLatch.await();
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
                Tracing.ThreadAccountantOps.clear();
            });
        }
        Thread.sleep(1000000L);
    }

    public void testThreadMemoryAccounting() throws Exception {
        LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
        LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
        ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
        ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
        HashMap hashMap = new HashMap();
        ServerMetrics.register((ServerMetrics) Mockito.mock(ServerMetrics.class));
        hashMap.put("accounting.oom.alarming.usage.ratio", Float.valueOf(0.0f));
        hashMap.put("accounting.factory.name", "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
        hashMap.put("accounting.enable.thread.memory.sampling", true);
        hashMap.put("accounting.enable.thread.cpu.sampling", false);
        ResourceManager resourceManager = getResourceManager(20, 40, 1, 1, hashMap);
        for (int i = 0; i < 30; i++) {
            int i2 = i;
            resourceManager.getQueryRunners().submit(() -> {
                Tracing.ThreadAccountantOps.setupRunner("q" + i2);
                Thread.currentThread();
                CountDownLatch countDownLatch = new CountDownLatch(10);
                ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
                for (int i3 = 0; i3 < 10; i3++) {
                    int i4 = i3;
                    resourceManager.getQueryWorkers().submit(() -> {
                        Tracing.ThreadAccountantOps.setupWorker(i4, new ThreadResourceUsageProvider(), threadExecutionContext);
                        long[] jArr = new long[1000];
                        for (int i5 = 0; i5 < (i4 + 1) * 10; i5++) {
                            Tracing.ThreadAccountantOps.sample();
                            jArr[i5] = new long[1000];
                            try {
                                Thread.sleep(200L);
                            } catch (InterruptedException e) {
                            }
                        }
                        Tracing.ThreadAccountantOps.clear();
                        System.out.println(jArr[0][0]);
                        countDownLatch.countDown();
                    });
                }
                try {
                    countDownLatch.await();
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
                Tracing.ThreadAccountantOps.clear();
            });
        }
        Thread.sleep(1000000L);
    }

    @Test
    public void testWorkerThreadInterruption() throws Exception {
        ResourceManager resourceManager = getResourceManager(2, 5, 1, 3, Collections.emptyMap());
        AtomicReference[] atomicReferenceArr = new AtomicReference[5];
        for (int i = 0; i < 5; i++) {
            atomicReferenceArr[i] = new AtomicReference();
        }
        ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
        AtomicReference atomicReference = new AtomicReference();
        resourceManager.getQueryRunners().submit(() -> {
            Thread currentThread = Thread.currentThread();
            atomicReference.set(currentThread);
            for (int i2 = 0; i2 < 5; i2++) {
                atomicReferenceArr[i2].set(resourceManager.getQueryWorkers().submit(() -> {
                    for (int i3 = 0; i3 < 1000000; i3++) {
                        try {
                            Thread.sleep(5L);
                        } catch (InterruptedException e) {
                        }
                        if (currentThread.isInterrupted()) {
                            throw new EarlyTerminationException();
                        }
                    }
                }));
            }
            while (true) {
            }
        });
        Thread.sleep(50L);
        ((Thread) atomicReference.get()).interrupt();
        for (int i2 = 0; i2 < 5; i2++) {
            try {
                ((Future) atomicReferenceArr[i2].get()).get();
            } catch (ExecutionException e) {
                Assert.assertFalse(((Future) atomicReferenceArr[i2].get()).isCancelled());
                Assert.assertTrue(((Future) atomicReferenceArr[i2].get()).isDone());
                Assert.assertEquals(e.getMessage(), "org.apache.pinot.spi.exception.EarlyTerminationException");
                return;
            }
        }
        Assert.fail("Expected EarlyTerminationException to be thrown");
    }

    public void testThreadMemory() throws Exception {
        LogManager.getLogger(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant.class).setLevel(Level.DEBUG);
        LogManager.getLogger(ThreadResourceUsageProvider.class).setLevel(Level.DEBUG);
        ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
        ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
        HashMap hashMap = new HashMap();
        ServerMetrics.register((ServerMetrics) Mockito.mock(ServerMetrics.class));
        hashMap.put("accounting.oom.alarming.usage.ratio", Float.valueOf(0.0f));
        hashMap.put("accounting.oom.critical.heap.usage.ratio", Float.valueOf(0.9f));
        hashMap.put("accounting.oom.enable.killing.query", true);
        hashMap.put("accounting.factory.name", "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
        hashMap.put("accounting.enable.thread.memory.sampling", true);
        hashMap.put("accounting.enable.thread.cpu.sampling", false);
        ResourceManager resourceManager = getResourceManager(20, 40, 1, 1, hashMap);
        Future[] futureArr = new Future[30];
        for (int i = 0; i < 4; i++) {
            int i2 = i;
            futureArr[i2] = resourceManager.getQueryRunners().submit(() -> {
                String str = "q" + i2;
                Tracing.ThreadAccountantOps.setupRunner(str);
                Thread currentThread = Thread.currentThread();
                CountDownLatch countDownLatch = new CountDownLatch(10);
                Future[] futureArr2 = new Future[10];
                ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
                for (int i3 = 0; i3 < 10; i3++) {
                    int i4 = i3;
                    futureArr2[i3] = resourceManager.getQueryWorkers().submit(() -> {
                        Tracing.ThreadAccountantOps.setupWorker(i4, new ThreadResourceUsageProvider(), threadExecutionContext);
                        long[] jArr = new long[1000];
                        for (int i5 = 0; i5 < (i2 + 1) * 80; i5++) {
                            Tracing.ThreadAccountantOps.sample();
                            if (Thread.interrupted() || currentThread.isInterrupted()) {
                                Tracing.ThreadAccountantOps.clear();
                                LOGGER.error("KilledWorker " + str + " " + i4);
                                return;
                            }
                            jArr[i5] = new long[200000];
                            for (int i6 = 0; i6 < 10000; i6++) {
                                jArr[i5][i6] = i6 % 178123;
                            }
                        }
                        Tracing.ThreadAccountantOps.clear();
                        System.out.println(jArr[0][0]);
                        countDownLatch.countDown();
                    });
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    for (int i5 = 0; i5 < 10; i5++) {
                        futureArr2[i5].cancel(true);
                    }
                    LOGGER.error("Killed " + str);
                }
                Tracing.ThreadAccountantOps.clear();
            });
        }
        Thread.sleep(1000000L);
    }

    private ResourceManager getResourceManager(int i, int i2, final int i3, final int i4, Map<String, Object> map) {
        return new ResourceManager(getConfig(i, i2, map)) { // from class: org.apache.pinot.core.accounting.ResourceManagerAccountingTest.1
            public QueryExecutorService getExecutorService(ServerQueryRequest serverQueryRequest, SchedulerGroupAccountant schedulerGroupAccountant) {
                return new QueryExecutorService() { // from class: org.apache.pinot.core.accounting.ResourceManagerAccountingTest.1.1
                    public void execute(Runnable runnable) {
                        getQueryWorkers().execute(runnable);
                    }
                };
            }

            public int getTableThreadsHardLimit() {
                return i4;
            }

            public int getTableThreadsSoftLimit() {
                return i3;
            }
        };
    }

    private PinotConfiguration getConfig(int i, int i2, Map<String, Object> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.put("query_runner_threads", Integer.valueOf(i));
        hashMap.put("query_worker_threads", Integer.valueOf(i2));
        return new PinotConfiguration(hashMap);
    }
}
