package org.apache.pinot.core.query.scheduler;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.com.google.common.util.concurrent.Futures;
import org.apache.pinot.$internal.com.google.common.util.concurrent.ListenableFuture;
import org.apache.pinot.$internal.com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.logger.ServerQueryLogger;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/core/query/scheduler/QueryScheduler.class */
public abstract class QueryScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) QueryScheduler.class);
    protected final ServerMetrics _serverMetrics;
    protected final QueryExecutor _queryExecutor;
    protected final ResourceManager _resourceManager;
    protected final LongAccumulator _latestQueryTime;
    protected final ServerQueryLogger _queryLogger = ServerQueryLogger.getInstance();
    protected volatile boolean _isRunning = false;

    public QueryScheduler(PinotConfiguration pinotConfiguration, QueryExecutor queryExecutor, ResourceManager resourceManager, ServerMetrics serverMetrics, LongAccumulator longAccumulator) {
        Preconditions.checkNotNull(pinotConfiguration);
        Preconditions.checkNotNull(queryExecutor);
        Preconditions.checkNotNull(resourceManager);
        Preconditions.checkNotNull(serverMetrics);
        Preconditions.checkNotNull(longAccumulator);
        this._serverMetrics = serverMetrics;
        this._resourceManager = resourceManager;
        this._queryExecutor = queryExecutor;
        this._latestQueryTime = longAccumulator;
    }

    public abstract ListenableFuture<byte[]> submit(ServerQueryRequest serverQueryRequest);

    public abstract String name();

    public void start() {
        this._isRunning = true;
    }

    public void stop() {
        this._isRunning = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ListenableFutureTask<byte[]> createQueryFutureTask(ServerQueryRequest serverQueryRequest, ExecutorService executorService) {
        QueryThreadContext.Memento createMemento = QueryThreadContext.isInitialized() ? QueryThreadContext.createMemento() : null;
        return ListenableFutureTask.create(() -> {
            QueryThreadContext.CloseableContext open = QueryThreadContext.open(createMemento);
            try {
                byte[] processQueryAndSerialize = processQueryAndSerialize(serverQueryRequest, executorService);
                if (open != null) {
                    open.close();
                }
                return processQueryAndSerialize;
            } catch (Throwable th) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Nullable
    protected byte[] processQueryAndSerialize(ServerQueryRequest serverQueryRequest, ExecutorService executorService) {
        InstanceResponseBlock instanceResponseBlock;
        Tracing.ThreadAccountantOps.setupRunner(serverQueryRequest.getQueryId());
        try {
            this._latestQueryTime.accumulate(System.currentTimeMillis());
            try {
                instanceResponseBlock = this._queryExecutor.execute(serverQueryRequest, executorService);
            } catch (Exception e) {
                LOGGER.error("Encountered exception while processing requestId {} from broker {}", Long.valueOf(serverQueryRequest.getRequestId()), serverQueryRequest.getBrokerId(), e);
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1L);
                instanceResponseBlock = new InstanceResponseBlock();
                instanceResponseBlock.addException(QueryErrorCode.INTERNAL, e.getMessage());
            }
            long requestId = serverQueryRequest.getRequestId();
            instanceResponseBlock.getResponseMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
            byte[] serializeResponse = serializeResponse(serverQueryRequest, instanceResponseBlock);
            if (this._queryLogger != null) {
                this._queryLogger.logQuery(serverQueryRequest, instanceResponseBlock, name());
            }
            Long maxServerResponseSizeBytes = QueryOptionsUtils.getMaxServerResponseSizeBytes(serverQueryRequest.getQueryContext().getQueryOptions());
            if (maxServerResponseSizeBytes != null && serializeResponse != null && serializeResponse.length > maxServerResponseSizeBytes.longValue()) {
                int length = serializeResponse.length;
                long requestId2 = serverQueryRequest.getRequestId();
                serverQueryRequest.getBrokerId();
                String str = "Serialized query response size " + length + " exceeds threshold " + maxServerResponseSizeBytes + " for requestId " + requestId2 + " from broker " + length;
                LOGGER.error(str);
                this._serverMetrics.addMeteredTableValue(serverQueryRequest.getTableNameWithType(), ServerMeter.LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS, 1L);
                InstanceResponseBlock instanceResponseBlock2 = new InstanceResponseBlock();
                instanceResponseBlock2.addException(QueryErrorCode.QUERY_CANCELLATION, str);
                instanceResponseBlock2.addMetadata(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
                serializeResponse = serializeResponse(serverQueryRequest, instanceResponseBlock2);
            }
            byte[] bArr = serializeResponse;
            Tracing.ThreadAccountantOps.clear();
            return bArr;
        } catch (Throwable th) {
            Tracing.ThreadAccountantOps.clear();
            throw th;
        }
    }

    private boolean forceLog(long j, long j2, long j3) {
        return j > 100 || j3 > 0 || j2 > 1000000;
    }

    @Nullable
    private byte[] serializeResponse(ServerQueryRequest serverQueryRequest, InstanceResponseBlock instanceResponseBlock) {
        TimerContext timerContext = serverQueryRequest.getTimerContext();
        TimerContext.Timer startNewPhaseTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.RESPONSE_SERIALIZATION);
        byte[] bArr = null;
        try {
            bArr = instanceResponseBlock.toDataTable().toBytes();
        } catch (EarlyTerminationException e) {
            Exception errorStatus = Tracing.getThreadAccountant().getErrorStatus();
            String str = "Cancelled while building data table" + (errorStatus == null ? "" : " " + String.valueOf(errorStatus));
            LOGGER.error(str);
            InstanceResponseBlock instanceResponseBlock2 = new InstanceResponseBlock(new ExceptionResultsBlock(QueryErrorMessage.safeMsg(QueryErrorCode.QUERY_CANCELLATION, str)));
            instanceResponseBlock2.addMetadata(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(serverQueryRequest.getRequestId()));
            return serializeResponse(serverQueryRequest, instanceResponseBlock2);
        } catch (Exception e2) {
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1L);
            LOGGER.error("Caught exception while serializing response for requestId: {}, brokerId: {}", Long.valueOf(serverQueryRequest.getRequestId()), serverQueryRequest.getBrokerId(), e2);
        }
        startNewPhaseTimer.stopAndRecord();
        timerContext.startNewPhaseTimer(ServerQueryPhase.TOTAL_QUERY_TIME, timerContext.getQueryArrivalTimeMs()).stopAndRecord();
        return bArr;
    }

    protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest serverQueryRequest, QueryErrorCode queryErrorCode) {
        InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock();
        instanceResponseBlock.addMetadata(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(serverQueryRequest.getRequestId()));
        instanceResponseBlock.addException(queryErrorCode, queryErrorCode.getDefaultMessage());
        return Futures.immediateFuture(serializeResponse(serverQueryRequest, instanceResponseBlock));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ListenableFuture<byte[]> shuttingDown(ServerQueryRequest serverQueryRequest) {
        return immediateErrorResponse(serverQueryRequest, QueryErrorCode.SERVER_SHUTTING_DOWN);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ListenableFuture<byte[]> outOfCapacity(ServerQueryRequest serverQueryRequest) {
        return immediateErrorResponse(serverQueryRequest, QueryErrorCode.SERVER_OUT_OF_CAPACITY);
    }
}
