package org.apache.pinot.core.query.scheduler;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pinot.$internal.com.google.common.util.concurrent.ListenableFuture;
import org.apache.pinot.$internal.com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.pinot.$internal.com.google.common.util.concurrent.MoreExecutors;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager;
import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.class */
public class BinaryWorkloadScheduler extends QueryScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BinaryWorkloadScheduler.class);
    public static final String MAX_SECONDARY_QUERIES = "binarywlm.maxSecondaryRunnerThreads";
    public static final int DEFAULT_MAX_SECONDARY_QUERIES = 5;
    private final int _numSecondaryRunners;
    private final Semaphore _secondaryRunnerSemaphore;
    private final SecondaryWorkloadQueue _secondaryQueryQ;
    Thread _scheduler;

    public BinaryWorkloadScheduler(PinotConfiguration pinotConfiguration, QueryExecutor queryExecutor, ServerMetrics serverMetrics, LongAccumulator longAccumulator) {
        super(pinotConfiguration, queryExecutor, new BinaryWorkloadResourceManager(pinotConfiguration), serverMetrics, longAccumulator);
        this._secondaryQueryQ = new SecondaryWorkloadQueue(pinotConfiguration, this._resourceManager);
        this._numSecondaryRunners = pinotConfiguration.getProperty(MAX_SECONDARY_QUERIES, 5);
        LOGGER.info("numSecondaryRunners={}", Integer.valueOf(this._numSecondaryRunners));
        this._secondaryRunnerSemaphore = new Semaphore(this._numSecondaryRunners);
    }

    @Override // org.apache.pinot.core.query.scheduler.QueryScheduler
    public String name() {
        return "BinaryWorkloadScheduler";
    }

    @Override // org.apache.pinot.core.query.scheduler.QueryScheduler
    public ListenableFuture<byte[]> submit(ServerQueryRequest serverQueryRequest) {
        if (!this._isRunning) {
            return immediateErrorResponse(serverQueryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR);
        }
        serverQueryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
        if (!QueryOptionsUtils.isSecondaryWorkload(serverQueryRequest.getQueryContext().getQueryOptions())) {
            ListenableFutureTask<byte[]> createQueryFutureTask = createQueryFutureTask(serverQueryRequest, this._resourceManager.getExecutorService(serverQueryRequest, null));
            this._resourceManager.getQueryRunners().submit((Runnable) createQueryFutureTask);
            return createQueryFutureTask;
        }
        SchedulerQueryContext schedulerQueryContext = new SchedulerQueryContext(serverQueryRequest);
        try {
            this._serverMetrics.addMeteredTableValue(serverQueryRequest.getTableNameWithType(), ServerMeter.NUM_SECONDARY_QUERIES, 1L);
            this._secondaryQueryQ.put(schedulerQueryContext);
            return schedulerQueryContext.getResultFuture();
        } catch (OutOfCapacityException e) {
            LOGGER.error("Out of capacity for query {} table {}, message: {}", Long.valueOf(serverQueryRequest.getRequestId()), serverQueryRequest.getTableNameWithType(), e.getMessage());
            return immediateErrorResponse(serverQueryRequest, QueryException.SERVER_OUT_OF_CAPACITY_ERROR);
        } catch (Exception e2) {
            LOGGER.error("Internal error for query {} table {}, message {}", Long.valueOf(serverQueryRequest.getRequestId()), serverQueryRequest.getTableNameWithType(), e2.getMessage());
            return immediateErrorResponse(serverQueryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR);
        }
    }

    @Override // org.apache.pinot.core.query.scheduler.QueryScheduler
    public void start() {
        super.start();
        this._scheduler = getScheduler();
        this._scheduler.setName(YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
        this._scheduler.setPriority(10);
        this._scheduler.setDaemon(true);
        this._scheduler.start();
    }

    private Thread getScheduler() {
        return new Thread(new Runnable() { // from class: org.apache.pinot.core.query.scheduler.BinaryWorkloadScheduler.1
            @Override // java.lang.Runnable
            public void run() {
                while (BinaryWorkloadScheduler.this._isRunning) {
                    try {
                        BinaryWorkloadScheduler.this._secondaryRunnerSemaphore.acquire();
                        try {
                            final SchedulerQueryContext take = BinaryWorkloadScheduler.this._secondaryQueryQ.take();
                            if (take != null) {
                                ServerQueryRequest queryRequest = take.getQueryRequest();
                                final QueryExecutorService executorService = BinaryWorkloadScheduler.this._resourceManager.getExecutorService(queryRequest, take.getSchedulerGroup());
                                ListenableFutureTask<byte[]> createQueryFutureTask = BinaryWorkloadScheduler.this.createQueryFutureTask(queryRequest, executorService);
                                createQueryFutureTask.addListener(new Runnable() { // from class: org.apache.pinot.core.query.scheduler.BinaryWorkloadScheduler.1.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        executorService.releaseWorkers();
                                        take.getSchedulerGroup().endQuery();
                                        BinaryWorkloadScheduler.this._secondaryRunnerSemaphore.release();
                                        BinaryWorkloadScheduler.this.checkStopResourceManager();
                                    }
                                }, MoreExecutors.directExecutor());
                                BinaryWorkloadScheduler.this.updateSecondaryWorkloadMetrics(queryRequest);
                                take.setResultFuture(createQueryFutureTask);
                                take.getSchedulerGroup().startQuery();
                                BinaryWorkloadScheduler.this._resourceManager.getQueryRunners().submit((Runnable) createQueryFutureTask);
                            }
                        } catch (Throwable th) {
                            BinaryWorkloadScheduler.LOGGER.error("Error in scheduler thread. This is indicative of a bug. Please report this. Server will continue with errors", th);
                        }
                    } catch (InterruptedException e) {
                        if (BinaryWorkloadScheduler.this._isRunning) {
                            BinaryWorkloadScheduler.LOGGER.error("Interrupt while acquiring semaphore. Exiting.", (Throwable) e);
                        } else {
                            BinaryWorkloadScheduler.LOGGER.info("Shutting down scheduler");
                        }
                    }
                }
                if (BinaryWorkloadScheduler.this._isRunning) {
                    throw new RuntimeException("FATAL: Scheduler thread is quitting.....something went horribly wrong.....!!!");
                }
                BinaryWorkloadScheduler.this.failAllPendingQueries();
            }
        });
    }

    private void updateSecondaryWorkloadMetrics(ServerQueryRequest serverQueryRequest) {
        this._serverMetrics.addTimedTableValue(serverQueryRequest.getTableNameWithType(), ServerTimer.SECONDARY_Q_WAIT_TIME_MS, System.currentTimeMillis() - serverQueryRequest.getTimerContext().getQueryArrivalTimeMs(), TimeUnit.MILLISECONDS);
        this._serverMetrics.addMeteredTableValue(serverQueryRequest.getTableNameWithType(), ServerMeter.NUM_SECONDARY_QUERIES_SCHEDULED, 1L);
    }

    @Override // org.apache.pinot.core.query.scheduler.QueryScheduler
    public void stop() {
        super.stop();
        if (this._scheduler != null) {
            this._scheduler.interrupt();
        }
    }

    private void checkStopResourceManager() {
        if (this._isRunning || this._secondaryRunnerSemaphore.availablePermits() != this._numSecondaryRunners) {
            return;
        }
        this._resourceManager.stop();
    }

    private synchronized void failAllPendingQueries() {
        for (SchedulerQueryContext schedulerQueryContext : this._secondaryQueryQ.drain()) {
            schedulerQueryContext.setResultFuture(immediateErrorResponse(schedulerQueryContext.getQueryRequest(), QueryException.SERVER_SCHEDULER_DOWN_ERROR));
        }
    }
}
