package org.apache.pinot.core.accounting;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.commons.compress.harmony.pack200.PackingOptions;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.accounting.CPUMemThreadLevelAccountingObjects;
import org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.class */
public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory {

    /* loaded from: input_file:org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory$PerQueryCPUMemResourceUsageAccountant.class */
    public static class PerQueryCPUMemResourceUsageAccountant extends Tracing.DefaultThreadResourceUsageAccountant {
        static final MemoryMXBean MEMORY_MX_BEAN;
        private static final Logger LOGGER;
        private static final boolean IS_DEBUG_MODE_ENABLED;
        private static final String ACCOUNTANT_TASK_NAME = "CPUMemThreadAccountant";
        private static final int ACCOUNTANT_PRIORITY = 4;
        private static final ExecutorService EXECUTOR_SERVICE;
        private final PinotConfiguration _config;
        private final ConcurrentHashMap<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadEntriesMap = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, Long> _concurrentTaskCPUStatsAggregator = new ConcurrentHashMap<>();
        private final ConcurrentHashMap<String, Long> _concurrentTaskMemStatsAggregator = new ConcurrentHashMap<>();
        private final HashMap<String, Long> _finishedTaskCPUStatsAggregator = new HashMap<>();
        private final HashMap<String, Long> _finishedTaskMemStatsAggregator = new HashMap<>();
        private final ThreadLocal<CPUMemThreadLevelAccountingObjects.ThreadEntry> _threadLocalEntry = ThreadLocal.withInitial(() -> {
            CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = new CPUMemThreadLevelAccountingObjects.ThreadEntry();
            this._threadEntriesMap.put(Thread.currentThread(), threadEntry);
            LOGGER.info("Adding thread to _threadLocalEntry: {}", Thread.currentThread().getName());
            return threadEntry;
        });
        private final ThreadLocal<ThreadResourceUsageProvider> _threadResourceUsageProvider;
        private final boolean _isThreadCPUSamplingEnabled;
        private final boolean _isThreadMemorySamplingEnabled;
        private final Set<String> _inactiveQuery;
        private final WatcherTask _watcherTask;
        private final String _instanceId;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: protected */
        /* loaded from: input_file:org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory$PerQueryCPUMemResourceUsageAccountant$AggregatedStats.class */
        public static class AggregatedStats {
            final String _queryId;
            final Thread _anchorThread;
            boolean _isAnchorThread;
            AtomicReference<Exception> _exceptionAtomicReference;
            long _allocatedBytes;
            long _cpuNS;

            public AggregatedStats(long j, long j2, Thread thread, boolean z, AtomicReference<Exception> atomicReference, String str) {
                this._cpuNS = j;
                this._allocatedBytes = j2;
                this._anchorThread = thread;
                this._isAnchorThread = z;
                this._exceptionAtomicReference = atomicReference;
                this._queryId = str;
            }

            public String toString() {
                String str = this._queryId;
                Thread thread = this._anchorThread;
                boolean z = this._isAnchorThread;
                AtomicReference<Exception> atomicReference = this._exceptionAtomicReference;
                long j = this._allocatedBytes;
                long j2 = this._cpuNS;
                return "AggregatedStats{_queryId='" + str + "', _anchorThread=" + thread + ", _isAnchorThread=" + z + ", _exceptionAtomicReference=" + atomicReference + ", _allocatedBytes=" + j + ", _cpuNS=" + str + "}";
            }

            public long getCpuNS() {
                return this._cpuNS;
            }

            public long getAllocatedBytes() {
                return this._allocatedBytes;
            }

            public Thread getAnchorThread() {
                return this._anchorThread;
            }

            public AggregatedStats merge(long j, long j2, boolean z, AtomicReference<Exception> atomicReference) {
                this._cpuNS += j;
                this._allocatedBytes += j2;
                if (z) {
                    this._isAnchorThread = true;
                    this._exceptionAtomicReference = atomicReference;
                }
                return this;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory$PerQueryCPUMemResourceUsageAccountant$TriggeringLevel.class */
        public enum TriggeringLevel {
            Normal,
            HeapMemoryAlarmingVerbose,
            CPUTimeBasedKilling,
            HeapMemoryCritical,
            HeapMemoryPanic
        }

        /* loaded from: input_file:org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory$PerQueryCPUMemResourceUsageAccountant$WatcherTask.class */
        class WatcherTask implements Runnable {
            private final long _maxHeapSize = PerQueryCPUMemResourceUsageAccountant.MEMORY_MX_BEAN.getHeapMemoryUsage().getMax();
            private final long _minMemoryFootprintForKill;
            private final long _panicLevel;
            private final long _criticalLevel;
            private final long _criticalLevelAfterGC;
            private final int _gcBackoffCount;
            private final long _alarmingLevel;
            private final int _normalSleepTime;
            private final int _gcWaitTime;
            private final int _alarmingSleepTimeDenominator;
            private final int _alarmingSleepTime;
            private final boolean _oomKillQueryEnabled;
            private final boolean _publishHeapUsageMetric;
            private final boolean _isCPUTimeBasedKillingEnabled;
            private final long _cpuTimeBasedKillingThresholdNS;
            private final boolean _isQueryKilledMetricEnabled;
            private final InstanceType _instanceType;
            private long _usedBytes;
            private int _sleepTime;
            private int _numQueriesKilledConsecutively;
            protected Map<String, AggregatedStats> _aggregatedUsagePerActiveQuery;
            private TriggeringLevel _triggeringLevel;
            private final AbstractMetrics _metrics;
            private final AbstractMetrics.Meter _queryKilledMeter;
            private final AbstractMetrics.Meter _heapMemoryCriticalExceededMeter;
            private final AbstractMetrics.Meter _heapMemoryPanicExceededMeter;
            private final AbstractMetrics.Gauge _memoryUsageGauge;

            WatcherTask() {
                this._minMemoryFootprintForKill = (long) (this._maxHeapSize * PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO, 0.025d));
                this._panicLevel = (long) (this._maxHeapSize * PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_PANIC_LEVEL_HEAP_USAGE_RATIO, 0.9900000095367432d));
                this._criticalLevel = (long) (this._maxHeapSize * PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.9599999785423279d));
                this._criticalLevelAfterGC = this._criticalLevel - ((long) (this._maxHeapSize * PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_CRITICAL_LEVEL_HEAP_USAGE_RATIO_DELTA_AFTER_GC, 0.15000000596046448d)));
                this._gcBackoffCount = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_BACKOFF_COUNT, 5);
                this._alarmingLevel = (long) (this._maxHeapSize * PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_ALARMING_LEVEL_HEAP_USAGE_RATIO, 0.75d));
                this._normalSleepTime = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_MS, 30);
                this._gcWaitTime = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_GC_WAIT_TIME_MS, 0);
                this._alarmingSleepTimeDenominator = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_SLEEP_TIME_DENOMINATOR, 3);
                this._alarmingSleepTime = this._normalSleepTime / this._alarmingSleepTimeDenominator;
                this._oomKillQueryEnabled = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, false);
                this._publishHeapUsageMetric = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_PUBLISHING_JVM_USAGE, false);
                this._isCPUTimeBasedKillingEnabled = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_ENABLED, false) && PerQueryCPUMemResourceUsageAccountant.this._isThreadCPUSamplingEnabled;
                this._cpuTimeBasedKillingThresholdNS = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_CPU_TIME_BASED_KILLING_THRESHOLD_MS, 30000) * PackingOptions.SEGMENT_LIMIT;
                this._isQueryKilledMetricEnabled = PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_QUERY_KILLED_METRIC_ENABLED, false);
                this._instanceType = InstanceType.valueOf(PerQueryCPUMemResourceUsageAccountant.this._config.getProperty(CommonConstants.Accounting.CONFIG_OF_INSTANCE_TYPE, CommonConstants.Accounting.DEFAULT_CONFIG_OF_INSTANCE_TYPE.toString()));
                this._numQueriesKilledConsecutively = 0;
                switch (this._instanceType) {
                    case SERVER:
                        this._metrics = ServerMetrics.get();
                        this._queryKilledMeter = ServerMeter.QUERIES_KILLED;
                        this._memoryUsageGauge = ServerGauge.JVM_HEAP_USED_BYTES;
                        this._heapMemoryCriticalExceededMeter = ServerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
                        this._heapMemoryPanicExceededMeter = ServerMeter.HEAP_PANIC_LEVEL_EXCEEDED;
                        return;
                    case BROKER:
                        this._metrics = BrokerMetrics.get();
                        this._queryKilledMeter = BrokerMeter.QUERIES_KILLED;
                        this._memoryUsageGauge = BrokerGauge.JVM_HEAP_USED_BYTES;
                        this._heapMemoryCriticalExceededMeter = BrokerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
                        this._heapMemoryPanicExceededMeter = BrokerMeter.HEAP_PANIC_LEVEL_EXCEEDED;
                        return;
                    default:
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("instanceType: {} not supported, using server metrics", this._instanceType);
                        this._metrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
                        this._queryKilledMeter = ServerMeter.QUERIES_KILLED;
                        this._memoryUsageGauge = ServerGauge.JVM_HEAP_USED_BYTES;
                        this._heapMemoryCriticalExceededMeter = ServerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
                        this._heapMemoryPanicExceededMeter = ServerMeter.HEAP_PANIC_LEVEL_EXCEEDED;
                        return;
                }
            }

            @Override // java.lang.Runnable
            public void run() {
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("Starting accountant task for PerQueryCPUMemAccountant.");
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("Xmx is {}", Long.valueOf(this._maxHeapSize));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_instanceType is {}", this._instanceType);
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_alarmingLevel of on heap memory is {}", Long.valueOf(this._alarmingLevel));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_criticalLevel of on heap memory is {}", Long.valueOf(this._criticalLevel));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_criticalLevelAfterGC of on heap memory is {}", Long.valueOf(this._criticalLevelAfterGC));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_panicLevel of on heap memory is {}", Long.valueOf(this._panicLevel));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_gcBackoffCount is {}", Integer.valueOf(this._gcBackoffCount));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_gcWaitTime is {}", Integer.valueOf(this._gcWaitTime));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_normalSleepTime is {}", Integer.valueOf(this._normalSleepTime));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_alarmingSleepTime is {}", Integer.valueOf(this._alarmingSleepTime));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_oomKillQueryEnabled: {}", Boolean.valueOf(this._oomKillQueryEnabled));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_minMemoryFootprintForKill: {}", Long.valueOf(this._minMemoryFootprintForKill));
                PerQueryCPUMemResourceUsageAccountant.LOGGER.info("_isCPUTimeBasedKillingEnabled: {}, _cpuTimeBasedKillingThresholdNS: {}", Boolean.valueOf(this._isCPUTimeBasedKillingEnabled), Long.valueOf(this._cpuTimeBasedKillingThresholdNS));
                while (true) {
                    PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
                    this._triggeringLevel = TriggeringLevel.Normal;
                    this._sleepTime = this._normalSleepTime;
                    this._aggregatedUsagePerActiveQuery = null;
                    try {
                        try {
                            collectTriggerMetrics();
                            if (outOfMemoryPanicTrigger()) {
                                PerQueryCPUMemResourceUsageAccountant.LOGGER.debug(this._aggregatedUsagePerActiveQuery == null ? "_aggregatedUsagePerActiveQuery : null" : this._aggregatedUsagePerActiveQuery.toString());
                                PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("_threadEntriesMap size: {}", Integer.valueOf(PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap.size()));
                                if (this._publishHeapUsageMetric) {
                                    this._metrics.setValueOfGlobalGauge(this._memoryUsageGauge, this._usedBytes);
                                }
                                PerQueryCPUMemResourceUsageAccountant.this.cleanInactive();
                                reschedule();
                            } else {
                                evalTriggers();
                                this._aggregatedUsagePerActiveQuery = PerQueryCPUMemResourceUsageAccountant.this.aggregate(this._triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal());
                                PerQueryCPUMemResourceUsageAccountant.this.postAggregation(this._aggregatedUsagePerActiveQuery);
                                triggeredActions();
                                PerQueryCPUMemResourceUsageAccountant.LOGGER.debug(this._aggregatedUsagePerActiveQuery == null ? "_aggregatedUsagePerActiveQuery : null" : this._aggregatedUsagePerActiveQuery.toString());
                                PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("_threadEntriesMap size: {}", Integer.valueOf(PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap.size()));
                                if (this._publishHeapUsageMetric) {
                                    this._metrics.setValueOfGlobalGauge(this._memoryUsageGauge, this._usedBytes);
                                }
                                PerQueryCPUMemResourceUsageAccountant.this.cleanInactive();
                                reschedule();
                            }
                        } catch (Exception e) {
                            PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Caught exception while executing stats aggregation and query kill", (Throwable) e);
                            PerQueryCPUMemResourceUsageAccountant.LOGGER.debug(this._aggregatedUsagePerActiveQuery == null ? "_aggregatedUsagePerActiveQuery : null" : this._aggregatedUsagePerActiveQuery.toString());
                            PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("_threadEntriesMap size: {}", Integer.valueOf(PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap.size()));
                            if (this._publishHeapUsageMetric) {
                                this._metrics.setValueOfGlobalGauge(this._memoryUsageGauge, this._usedBytes);
                            }
                            PerQueryCPUMemResourceUsageAccountant.this.cleanInactive();
                            reschedule();
                        }
                    } catch (Throwable th) {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.debug(this._aggregatedUsagePerActiveQuery == null ? "_aggregatedUsagePerActiveQuery : null" : this._aggregatedUsagePerActiveQuery.toString());
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("_threadEntriesMap size: {}", Integer.valueOf(PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap.size()));
                        if (this._publishHeapUsageMetric) {
                            this._metrics.setValueOfGlobalGauge(this._memoryUsageGauge, this._usedBytes);
                        }
                        PerQueryCPUMemResourceUsageAccountant.this.cleanInactive();
                        reschedule();
                        throw th;
                    }
                }
            }

            private void collectTriggerMetrics() {
                this._usedBytes = PerQueryCPUMemResourceUsageAccountant.MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
                PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("Heap used bytes {}", Long.valueOf(this._usedBytes));
            }

            private boolean outOfMemoryPanicTrigger() {
                if (this._usedBytes < this._panicLevel) {
                    return false;
                }
                killAllQueries();
                this._triggeringLevel = TriggeringLevel.HeapMemoryPanic;
                this._metrics.addMeteredGlobalValue(this._heapMemoryPanicExceededMeter, 1L);
                PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Heap used bytes {}, greater than _panicLevel {}, Killed all queries and triggered gc!", Long.valueOf(this._usedBytes), Long.valueOf(this._panicLevel));
                PerQueryCPUMemResourceUsageAccountant.this.aggregate(false);
                return true;
            }

            private void evalTriggers() {
                if (this._isCPUTimeBasedKillingEnabled) {
                    this._triggeringLevel = TriggeringLevel.CPUTimeBasedKilling;
                }
                if (this._usedBytes > this._criticalLevel) {
                    this._triggeringLevel = TriggeringLevel.HeapMemoryCritical;
                    this._metrics.addMeteredGlobalValue(this._heapMemoryCriticalExceededMeter, 1L);
                } else if (this._usedBytes > this._alarmingLevel) {
                    this._sleepTime = this._alarmingSleepTime;
                    this._triggeringLevel = (PerQueryCPUMemResourceUsageAccountant.IS_DEBUG_MODE_ENABLED && this._triggeringLevel == TriggeringLevel.Normal) ? TriggeringLevel.HeapMemoryAlarmingVerbose : this._triggeringLevel;
                }
            }

            private void triggeredActions() {
                switch (this._triggeringLevel) {
                    case HeapMemoryCritical:
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Heap used bytes {} exceeds critical level {}", Long.valueOf(this._usedBytes), Long.valueOf(this._criticalLevel));
                        killMostExpensiveQuery();
                        return;
                    case CPUTimeBasedKilling:
                        killCPUTimeExceedQueries();
                        return;
                    case HeapMemoryAlarmingVerbose:
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Heap used bytes {} exceeds alarming level", Long.valueOf(this._usedBytes));
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Query usage aggregation results {}", this._aggregatedUsagePerActiveQuery.toString());
                        this._numQueriesKilledConsecutively = 0;
                        return;
                    default:
                        this._numQueriesKilledConsecutively = 0;
                        return;
                }
            }

            void reschedule() {
                try {
                    Thread.sleep(this._sleepTime);
                } catch (InterruptedException e) {
                }
            }

            void killAllQueries() {
                if (this._oomKillQueryEnabled) {
                    int i = 0;
                    Iterator<Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry>> it2 = PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap.entrySet().iterator();
                    while (it2.hasNext()) {
                        CPUMemThreadLevelAccountingObjects.ThreadEntry value = it2.next().getValue();
                        CPUMemThreadLevelAccountingObjects.TaskEntry currentThreadTaskStatus = value.getCurrentThreadTaskStatus();
                        if (currentThreadTaskStatus != null && currentThreadTaskStatus.isAnchorThread()) {
                            value._errorStatus.set(new RuntimeException(String.format("Query killed due to %s out of memory!", this._instanceType)));
                            currentThreadTaskStatus.getAnchorThread().interrupt();
                            i++;
                        }
                    }
                    if (this._isQueryKilledMetricEnabled) {
                        this._metrics.addMeteredGlobalValue(this._queryKilledMeter, i);
                    }
                    try {
                        Thread.sleep(this._normalSleepTime);
                    } catch (InterruptedException e) {
                    }
                    System.gc();
                    this._numQueriesKilledConsecutively = 0;
                }
            }

            private void killMostExpensiveQuery() {
                if (!this._aggregatedUsagePerActiveQuery.isEmpty() && this._numQueriesKilledConsecutively >= this._gcBackoffCount) {
                    this._numQueriesKilledConsecutively = 0;
                    System.gc();
                    try {
                        Thread.sleep(this._gcWaitTime);
                    } catch (InterruptedException e) {
                    }
                    this._usedBytes = PerQueryCPUMemResourceUsageAccountant.MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed();
                    if (this._usedBytes < this._criticalLevelAfterGC) {
                        return;
                    } else {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("After GC, heap used bytes {} still exceeds _criticalLevelAfterGC level {}", Long.valueOf(this._usedBytes), Long.valueOf(this._criticalLevelAfterGC));
                    }
                }
                if (!PerQueryCPUMemResourceUsageAccountant.this._isThreadMemorySamplingEnabled && !PerQueryCPUMemResourceUsageAccountant.this._isThreadCPUSamplingEnabled) {
                    PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("But unable to kill query because neither memory nor cpu tracking is enabled");
                    return;
                }
                if (this._aggregatedUsagePerActiveQuery.isEmpty()) {
                    PerQueryCPUMemResourceUsageAccountant.LOGGER.debug("No active queries to kill");
                    return;
                }
                if (PerQueryCPUMemResourceUsageAccountant.this._isThreadMemorySamplingEnabled) {
                    AggregatedStats aggregatedStats = (AggregatedStats) Collections.max(this._aggregatedUsagePerActiveQuery.values(), Comparator.comparing((v0) -> {
                        return v0.getAllocatedBytes();
                    }));
                    if (this._oomKillQueryEnabled && aggregatedStats._allocatedBytes > this._minMemoryFootprintForKill) {
                        aggregatedStats._exceptionAtomicReference.set(new RuntimeException(String.format(" Query %s got killed because using %d bytes of memory on %s: %s, exceeding the quota", aggregatedStats._queryId, Long.valueOf(aggregatedStats.getAllocatedBytes()), this._instanceType, PerQueryCPUMemResourceUsageAccountant.this._instanceId)));
                        interruptRunnerThread(aggregatedStats.getAnchorThread());
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Query {} got picked because using {} bytes of memory, actual kill committed true}", aggregatedStats._queryId, Long.valueOf(aggregatedStats._allocatedBytes));
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Current task status recorded is {}", PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap);
                    } else if (this._oomKillQueryEnabled) {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("But all queries are below quota, no query killed");
                    } else {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false because oomKillQueryEnabled is false", aggregatedStats._queryId, Long.valueOf(aggregatedStats._allocatedBytes));
                    }
                } else {
                    AggregatedStats aggregatedStats2 = (AggregatedStats) Collections.max(this._aggregatedUsagePerActiveQuery.values(), Comparator.comparing((v0) -> {
                        return v0.getCpuNS();
                    }));
                    if (this._oomKillQueryEnabled) {
                        aggregatedStats2._exceptionAtomicReference.set(new RuntimeException(String.format(" Query %s got killed because memory pressure, using %d ns of CPU time on %s: %s", aggregatedStats2._queryId, Long.valueOf(aggregatedStats2.getAllocatedBytes()), this._instanceType, PerQueryCPUMemResourceUsageAccountant.this._instanceId)));
                        interruptRunnerThread(aggregatedStats2.getAnchorThread());
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Query {} got picked because using {} ns of cpu time, actual kill committed true", Long.valueOf(aggregatedStats2._allocatedBytes), aggregatedStats2._queryId);
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Current task status recorded is {}", PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap);
                    } else {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Query {} got picked because using {} bytes of memory, actual kill committed false because oomKillQueryEnabled is false", aggregatedStats2._queryId, Long.valueOf(aggregatedStats2._allocatedBytes));
                    }
                }
                PerQueryCPUMemResourceUsageAccountant.LOGGER.warn("Query aggregation results {} for the previous kill.", this._aggregatedUsagePerActiveQuery.toString());
            }

            private void killCPUTimeExceedQueries() {
                Iterator<Map.Entry<String, AggregatedStats>> it2 = this._aggregatedUsagePerActiveQuery.entrySet().iterator();
                while (it2.hasNext()) {
                    AggregatedStats value = it2.next().getValue();
                    if (value._cpuNS > this._cpuTimeBasedKillingThresholdNS) {
                        PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Query {} got picked because using {} ns of cpu time, greater than threshold {}", value._queryId, Long.valueOf(value.getCpuNS()), Long.valueOf(this._cpuTimeBasedKillingThresholdNS));
                        value._exceptionAtomicReference.set(new RuntimeException(String.format("Query %s got killed on %s: %s because using %d CPU time exceeding limit of %d ns CPU time", value._queryId, this._instanceType, PerQueryCPUMemResourceUsageAccountant.this._instanceId, Long.valueOf(value.getCpuNS()), Long.valueOf(this._cpuTimeBasedKillingThresholdNS))));
                        interruptRunnerThread(value.getAnchorThread());
                    }
                }
                PerQueryCPUMemResourceUsageAccountant.LOGGER.error("Current task status recorded is {}", PerQueryCPUMemResourceUsageAccountant.this._threadEntriesMap);
            }

            private void interruptRunnerThread(Thread thread) {
                thread.interrupt();
                if (this._isQueryKilledMetricEnabled) {
                    this._metrics.addMeteredGlobalValue(this._queryKilledMeter, 1L);
                }
                this._numQueriesKilledConsecutively++;
            }
        }

        public PerQueryCPUMemResourceUsageAccountant(PinotConfiguration pinotConfiguration, String str) {
            LOGGER.info("Initializing PerQueryCPUMemResourceUsageAccountant");
            this._config = pinotConfiguration;
            this._instanceId = str;
            boolean isThreadCpuTimeMeasurementEnabled = ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled();
            boolean isThreadMemoryMeasurementEnabled = ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled();
            LOGGER.info("threadCpuTimeMeasurementEnabled: {}, threadMemoryMeasurementEnabled: {}", Boolean.valueOf(isThreadCpuTimeMeasurementEnabled), Boolean.valueOf(isThreadMemoryMeasurementEnabled));
            boolean property = pinotConfiguration.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_CPU_SAMPLING.booleanValue());
            boolean property2 = pinotConfiguration.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_MEMORY_SAMPLING.booleanValue());
            LOGGER.info("cpuSamplingConfig: {}, memorySamplingConfig: {}", Boolean.valueOf(property), Boolean.valueOf(property2));
            this._isThreadCPUSamplingEnabled = property && isThreadCpuTimeMeasurementEnabled;
            this._isThreadMemorySamplingEnabled = property2 && isThreadMemoryMeasurementEnabled;
            LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", Boolean.valueOf(this._isThreadCPUSamplingEnabled), Boolean.valueOf(this._isThreadMemorySamplingEnabled));
            this._threadResourceUsageProvider = new ThreadLocal<>();
            this._inactiveQuery = new HashSet();
            this._watcherTask = new WatcherTask();
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public void sampleUsage() {
            sampleThreadBytesAllocated();
            sampleThreadCPUTime();
        }

        public int getEntryCount() {
            return this._threadEntriesMap.size();
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public void updateQueryUsageConcurrently(String str) {
            if (this._isThreadCPUSamplingEnabled) {
                long threadTimeNs = getThreadResourceUsageProvider().getThreadTimeNs();
                this._concurrentTaskCPUStatsAggregator.compute(str, (str2, l) -> {
                    return Long.valueOf(l == null ? threadTimeNs : l.longValue() + threadTimeNs);
                });
            }
            if (this._isThreadMemorySamplingEnabled) {
                long threadAllocatedBytes = getThreadResourceUsageProvider().getThreadAllocatedBytes();
                this._concurrentTaskMemStatsAggregator.compute(str, (str3, l2) -> {
                    return Long.valueOf(l2 == null ? threadAllocatedBytes : l2.longValue() + threadAllocatedBytes);
                });
            }
        }

        public void sampleThreadCPUTime() {
            if (this._isThreadCPUSamplingEnabled) {
                this._threadLocalEntry.get()._currentThreadCPUTimeSampleMS = getThreadResourceUsageProvider().getThreadTimeNs();
            }
        }

        public void sampleThreadBytesAllocated() {
            if (this._isThreadMemorySamplingEnabled) {
                this._threadLocalEntry.get()._currentThreadMemoryAllocationSampleBytes = getThreadResourceUsageProvider().getThreadAllocatedBytes();
            }
        }

        private ThreadResourceUsageProvider getThreadResourceUsageProvider() {
            return this._threadResourceUsageProvider.get();
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public void setThreadResourceUsageProvider(ThreadResourceUsageProvider threadResourceUsageProvider) {
            this._threadResourceUsageProvider.set(threadResourceUsageProvider);
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant
        public void createExecutionContextInner(@Nullable String str, int i, @Nullable ThreadExecutionContext threadExecutionContext) {
            this._threadLocalEntry.get()._errorStatus.set(null);
            if (threadExecutionContext != null) {
                this._threadLocalEntry.get().setThreadTaskStatus(threadExecutionContext.getQueryId(), i, threadExecutionContext.getAnchorThread());
            } else {
                if (!$assertionsDisabled && str == null) {
                    throw new AssertionError();
                }
                this._threadLocalEntry.get().setThreadTaskStatus(str, -1, Thread.currentThread());
            }
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public ThreadExecutionContext getThreadExecutionContext() {
            return this._threadLocalEntry.get().getCurrentThreadTaskStatus();
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public void clear() {
            this._threadLocalEntry.get().setToIdle();
            this._threadResourceUsageProvider.set(null);
            super.clear();
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public void startWatcherTask() {
            EXECUTOR_SERVICE.submit(this._watcherTask);
        }

        public void cleanInactive() {
            for (String str : this._inactiveQuery) {
                if (this._isThreadCPUSamplingEnabled) {
                    this._finishedTaskCPUStatsAggregator.remove(str);
                    this._concurrentTaskCPUStatsAggregator.remove(str);
                }
                if (this._isThreadMemorySamplingEnabled) {
                    this._finishedTaskMemStatsAggregator.remove(str);
                    this._concurrentTaskMemStatsAggregator.remove(str);
                }
            }
            this._inactiveQuery.clear();
            if (this._isThreadCPUSamplingEnabled) {
                this._inactiveQuery.addAll(this._finishedTaskCPUStatsAggregator.keySet());
                this._inactiveQuery.addAll(this._concurrentTaskCPUStatsAggregator.keySet());
            }
            if (this._isThreadMemorySamplingEnabled) {
                this._inactiveQuery.addAll(this._finishedTaskMemStatsAggregator.keySet());
                this._inactiveQuery.addAll(this._concurrentTaskMemStatsAggregator.keySet());
            }
        }

        public Map<String, AggregatedStats> aggregate(boolean z) {
            HashMap hashMap = z ? new HashMap() : null;
            for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry> entry : this._threadEntriesMap.entrySet()) {
                CPUMemThreadLevelAccountingObjects.ThreadEntry value = entry.getValue();
                long j = this._isThreadCPUSamplingEnabled ? value._currentThreadCPUTimeSampleMS : 0L;
                long j2 = this._isThreadMemorySamplingEnabled ? value._currentThreadMemoryAllocationSampleBytes : 0L;
                CPUMemThreadLevelAccountingObjects.TaskEntry currentThreadTaskStatus = value.getCurrentThreadTaskStatus();
                Thread key = entry.getKey();
                LOGGER.trace("tid: {}, task: {}", Long.valueOf(key.getId()), currentThreadTaskStatus);
                CPUMemThreadLevelAccountingObjects.TaskEntry taskEntry = value._previousThreadTaskStatus;
                if (currentThreadTaskStatus != taskEntry) {
                    value._previousThreadTaskStatus = currentThreadTaskStatus;
                    if (taskEntry != null) {
                        String queryId = taskEntry.getQueryId();
                        if (this._isThreadCPUSamplingEnabled) {
                            this._finishedTaskCPUStatsAggregator.merge(queryId, Long.valueOf(value._previousThreadCPUTimeSampleMS), (v0, v1) -> {
                                return Long.sum(v0, v1);
                            });
                        }
                        if (this._isThreadMemorySamplingEnabled) {
                            this._finishedTaskMemStatsAggregator.merge(queryId, Long.valueOf(value._previousThreadMemoryAllocationSampleBytes), (v0, v1) -> {
                                return Long.sum(v0, v1);
                            });
                        }
                    }
                }
                if (this._isThreadCPUSamplingEnabled) {
                    value._previousThreadCPUTimeSampleMS = j;
                }
                if (this._isThreadMemorySamplingEnabled) {
                    value._previousThreadMemoryAllocationSampleBytes = j2;
                }
                if (currentThreadTaskStatus != null) {
                    String queryId2 = currentThreadTaskStatus.getQueryId();
                    this._inactiveQuery.remove(queryId2);
                    if (z) {
                        Thread anchorThread = currentThreadTaskStatus.getAnchorThread();
                        boolean isAnchorThread = currentThreadTaskStatus.isAnchorThread();
                        hashMap.compute(queryId2, (str, aggregatedStats) -> {
                            return aggregatedStats == null ? new AggregatedStats(j, j2, anchorThread, isAnchorThread, value._errorStatus, queryId2) : aggregatedStats.merge(j, j2, isAnchorThread, value._errorStatus);
                        });
                    }
                }
                if (!key.isAlive()) {
                    this._threadEntriesMap.remove(key);
                    LOGGER.info("Removing thread from _threadLocalEntry: {}", key.getName());
                }
            }
            if (z) {
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    String str2 = (String) entry2.getKey();
                    ((AggregatedStats) entry2.getValue()).merge((this._isThreadCPUSamplingEnabled ? this._finishedTaskCPUStatsAggregator.getOrDefault(str2, 0L).longValue() : 0L) + (this._isThreadCPUSamplingEnabled ? this._concurrentTaskCPUStatsAggregator.getOrDefault(str2, 0L).longValue() : 0L), (this._isThreadMemorySamplingEnabled ? this._finishedTaskMemStatsAggregator.getOrDefault(str2, 0L).longValue() : 0L) + (this._isThreadMemorySamplingEnabled ? this._concurrentTaskMemStatsAggregator.getOrDefault(str2, 0L).longValue() : 0L), false, null);
                }
            }
            return hashMap;
        }

        public void postAggregation(Map<String, AggregatedStats> map) {
        }

        @Override // org.apache.pinot.spi.trace.Tracing.DefaultThreadResourceUsageAccountant, org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant
        public Exception getErrorStatus() {
            return this._threadLocalEntry.get()._errorStatus.getAndSet(null);
        }

        static {
            $assertionsDisabled = !PerQueryCPUMemAccountantFactory.class.desiredAssertionStatus();
            MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();
            LOGGER = LoggerFactory.getLogger((Class<?>) PerQueryCPUMemResourceUsageAccountant.class);
            IS_DEBUG_MODE_ENABLED = LOGGER.isDebugEnabled();
            EXECUTOR_SERVICE = Executors.newFixedThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setPriority(4);
                thread.setDaemon(true);
                thread.setName(ACCOUNTANT_TASK_NAME);
                return thread;
            });
        }
    }

    @Override // org.apache.pinot.spi.accounting.ThreadAccountantFactory
    public ThreadResourceUsageAccountant init(PinotConfiguration pinotConfiguration, String str) {
        return new PerQueryCPUMemResourceUsageAccountant(pinotConfiguration, str);
    }
}
