package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.calcite.sql.SqlKind;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.ExceptionUtils;
import org.apache.pinot.common.utils.Timer;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.class */
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);
    private static final int NUM_UNAVAILABLE_SEGMENTS_TO_LOG = 10;
    private final WorkerManager _workerManager;
    private final QueryDispatcher _queryDispatcher;
    private final boolean _explainAskingServerDefault;
    private final MultiStageQueryThrottler _queryThrottler;

    /* renamed from: org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$calcite$sql$SqlKind = new int[SqlKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$calcite$sql$SqlKind[SqlKind.EXPLAIN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$calcite$sql$SqlKind[SqlKind.SELECT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public MultiStageBrokerRequestHandler(PinotConfiguration pinotConfiguration, String str, BrokerRoutingManager brokerRoutingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, MultiStageQueryThrottler multiStageQueryThrottler) {
        super(pinotConfiguration, str, brokerRoutingManager, accessControlFactory, queryQuotaManager, tableCache);
        String property = pinotConfiguration.getProperty("pinot.query.runner.hostname");
        int parseInt = Integer.parseInt(pinotConfiguration.getProperty("pinot.query.runner.port"));
        this._workerManager = new WorkerManager(property, parseInt, this._routingManager);
        TlsConfig extractTlsConfig = pinotConfiguration.getProperty("pinot.multistage.engine.tls.enabled", false) ? TlsUtils.extractTlsConfig(pinotConfiguration, "pinot.broker.tls") : null;
        this._queryDispatcher = new QueryDispatcher(new MailboxService(property, parseInt, pinotConfiguration, extractTlsConfig), extractTlsConfig);
        LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, query log max length: {}, query log max rate: {}", new Object[]{property, Integer.valueOf(parseInt), this._brokerId, Long.valueOf(this._brokerTimeoutMs), Integer.valueOf(this._queryLogger.getMaxQueryLengthToLog()), Double.valueOf(this._queryLogger.getLogRateLimit())});
        this._explainAskingServerDefault = this._config.getProperty("pinot.query.multistage.explain.include.segment.plan", false);
        this._queryThrottler = multiStageQueryThrottler;
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void start() {
        this._queryDispatcher.start();
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void shutDown() {
        this._queryDispatcher.shutdown();
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler
    protected BrokerResponse handleRequest(long j, String str, SqlNodeAndOptions sqlNodeAndOptions, JsonNode jsonNode, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders, AccessControl accessControl) {
        LOGGER.debug("SQL query for request {}: {}", Long.valueOf(j), str);
        Map options = sqlNodeAndOptions.getOptions();
        long nanoTime = System.nanoTime();
        try {
            Long timeoutMs = QueryOptionsUtils.getTimeoutMs(options);
            long longValue = timeoutMs != null ? timeoutMs.longValue() : this._brokerTimeoutMs;
            String extractDatabaseFromQueryRequest = DatabaseUtils.extractDatabaseFromQueryRequest(options, httpHeaders);
            QueryEnvironment queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder().database(extractDatabaseFromQueryRequest).tableCache(this._tableCache).workerManager(this._workerManager).defaultInferPartitionHint(this._config.getProperty("pinot.broker.multistage.infer.partition.hint", false)).build());
            switch (AnonymousClass1.$SwitchMap$org$apache$calcite$sql$SqlKind[sqlNodeAndOptions.getSqlNode().getKind().ordinal()]) {
                case 1:
                    QueryEnvironment.QueryPlannerResult explainQuery = queryEnvironment.explainQuery(str, sqlNodeAndOptions, j, ((Boolean) QueryOptionsUtils.isExplainAskingServers(options).orElse(Boolean.valueOf(this._explainAskingServerDefault))).booleanValue() ? dispatchablePlanFragment -> {
                        return requestPhysicalPlan(dispatchablePlanFragment, requestContext, longValue, options);
                    } : null);
                    String explainPlan = explainQuery.getExplainPlan();
                    TableAuthorizationResult hasTableAccess = hasTableAccess(requesterIdentity, explainQuery.getTableNames(), requestContext, httpHeaders);
                    if (hasTableAccess.hasAccess()) {
                        return constructMultistageExplainPlan(str, explainPlan);
                    }
                    String failureMessage = hasTableAccess.getFailureMessage();
                    if (StringUtils.isNotBlank(failureMessage)) {
                        failureMessage = "Reason: " + failureMessage;
                    }
                    throw new WebApplicationException("Permission denied. " + failureMessage, Response.Status.FORBIDDEN);
                case 2:
                default:
                    QueryEnvironment.QueryPlannerResult planQuery = queryEnvironment.planQuery(str, sqlNodeAndOptions, j);
                    DispatchableSubPlan queryPlan = planQuery.getQueryPlan();
                    Set<String> tableNames = planQuery.getTableNames();
                    this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.MULTI_STAGE_QUERIES_GLOBAL, 1L);
                    Iterator<String> it = tableNames.iterator();
                    while (it.hasNext()) {
                        this._brokerMetrics.addMeteredTableValue(it.next(), BrokerMeter.MULTI_STAGE_QUERIES, 1L);
                    }
                    requestContext.setTableNames(List.copyOf(tableNames));
                    updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, (System.nanoTime() - nanoTime) + sqlNodeAndOptions.getParseTimeNs());
                    TableAuthorizationResult hasTableAccess2 = hasTableAccess(requesterIdentity, tableNames, requestContext, httpHeaders);
                    if (!hasTableAccess2.hasAccess()) {
                        String failureMessage2 = hasTableAccess2.getFailureMessage();
                        if (StringUtils.isNotBlank(failureMessage2)) {
                            failureMessage2 = "Reason: " + failureMessage2;
                        }
                        throw new WebApplicationException("Permission denied." + failureMessage2, Response.Status.FORBIDDEN);
                    }
                    if (hasExceededQPSQuota(extractDatabaseFromQueryRequest, tableNames, requestContext)) {
                        return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, String.format("Request %d: %s exceeds query quota.", Long.valueOf(j), str)));
                    }
                    Timer timer = new Timer(longValue);
                    try {
                        if (!this._queryThrottler.tryAcquire(longValue, TimeUnit.MILLISECONDS)) {
                            LOGGER.warn("Timed out waiting to execute request {}: {}", Long.valueOf(j), str);
                            requestContext.setErrorCode(250);
                            return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
                        }
                        try {
                            Tracing.ThreadAccountantOps.setupRunner(String.valueOf(j), ThreadExecutionContext.TaskType.MSE);
                            long nanoTime2 = System.nanoTime();
                            try {
                                try {
                                    QueryDispatcher.QueryResult submitAndReduce = this._queryDispatcher.submitAndReduce(requestContext, queryPlan, timer.getRemainingTime(), options);
                                    Tracing.getThreadAccountant().clear();
                                    updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, System.nanoTime() - nanoTime2);
                                    BrokerResponseNativeV2 brokerResponseNativeV2 = new BrokerResponseNativeV2();
                                    brokerResponseNativeV2.setResultTable(submitAndReduce.getResultTable());
                                    brokerResponseNativeV2.setTablesQueried(tableNames);
                                    brokerResponseNativeV2.setBrokerReduceTimeMs(submitAndReduce.getBrokerReduceTimeMs());
                                    int i = 0;
                                    for (Map.Entry entry : queryPlan.getTableToUnavailableSegmentsMap().entrySet()) {
                                        String str2 = (String) entry.getKey();
                                        Set set = (Set) entry.getValue();
                                        int size = set.size();
                                        i += size;
                                        brokerResponseNativeV2.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR, String.format("Found %d unavailable segments for table %s: %s", Integer.valueOf(size), str2, toSizeLimitedString(set, NUM_UNAVAILABLE_SEGMENTS_TO_LOG))));
                                    }
                                    requestContext.setNumUnavailableSegments(i);
                                    fillOldBrokerResponseStats(brokerResponseNativeV2, submitAndReduce.getQueryStats(), queryPlan);
                                    brokerResponseNativeV2.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis());
                                    augmentStatistics(requestContext, brokerResponseNativeV2);
                                    if (QueryOptionsUtils.shouldDropResults(options)) {
                                        brokerResponseNativeV2.setResultTable((ResultTable) null);
                                    }
                                    this._queryLogger.log(new QueryLogger.QueryLogParams(requestContext, tableNames.toString(), brokerResponseNativeV2, requesterIdentity, null));
                                    this._queryThrottler.release();
                                    return brokerResponseNativeV2;
                                } catch (Throwable th) {
                                    Tracing.getThreadAccountant().clear();
                                    throw th;
                                }
                            } catch (TimeoutException e) {
                                Iterator<String> it2 = tableNames.iterator();
                                while (it2.hasNext()) {
                                    this._brokerMetrics.addMeteredTableValue(it2.next(), BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1L);
                                }
                                LOGGER.warn("Timed out executing request {}: {}", Long.valueOf(j), str);
                                requestContext.setErrorCode(250);
                                BrokerResponseNative brokerResponseNative = new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
                                Tracing.getThreadAccountant().clear();
                                this._queryThrottler.release();
                                return brokerResponseNative;
                            } catch (Throwable th2) {
                                String consolidateExceptionMessages = ExceptionUtils.consolidateExceptionMessages(th2);
                                LOGGER.error("Caught exception executing request {}: {}, {}", new Object[]{Long.valueOf(j), str, consolidateExceptionMessages});
                                requestContext.setErrorCode(200);
                                BrokerResponseNative brokerResponseNative2 = new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidateExceptionMessages));
                                Tracing.getThreadAccountant().clear();
                                this._queryThrottler.release();
                                return brokerResponseNative2;
                            }
                        } catch (Throwable th3) {
                            this._queryThrottler.release();
                            throw th3;
                        }
                    } catch (InterruptedException e2) {
                        LOGGER.warn("Interrupt received while waiting to execute request {}: {}", Long.valueOf(j), str);
                        requestContext.setErrorCode(250);
                        return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
                    }
            }
        } catch (RuntimeException e3) {
            String consolidateExceptionMessages2 = ExceptionUtils.consolidateExceptionMessages(e3);
            LOGGER.warn("Caught exception planning request {}: {}, {}", new Object[]{Long.valueOf(j), str, consolidateExceptionMessages2});
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1L);
            if (e3.getMessage().matches(".* Column .* not found in any table'")) {
                requestContext.setErrorCode(710);
                return new BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR, consolidateExceptionMessages2));
            }
            requestContext.setErrorCode(720);
            return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PLANNING_ERROR, consolidateExceptionMessages2));
        } catch (DatabaseConflictException e4) {
            LOGGER.info("{}. Request {}: {}", new Object[]{e4.getMessage(), Long.valueOf(j), str});
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1L);
            requestContext.setErrorCode(700);
            return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e4));
        } catch (WebApplicationException e5) {
            throw e5;
        }
    }

    private Collection<PlanNode> requestPhysicalPlan(DispatchablePlanFragment dispatchablePlanFragment, RequestContext requestContext, long j, Map<String, String> map) {
        try {
            return this._queryDispatcher.explain(requestContext, dispatchablePlanFragment, j, map);
        } catch (Exception e) {
            throw new RuntimeException("Cannot obtain physical plan for fragment " + dispatchablePlanFragment.getPlanFragment().getFragmentRoot().explain(), e);
        }
    }

    private void fillOldBrokerResponseStats(BrokerResponseNativeV2 brokerResponseNativeV2, List<MultiStageQueryStats.StageStats.Closed> list, DispatchableSubPlan dispatchableSubPlan) {
        try {
            brokerResponseNativeV2.setStageStats(new MultiStageStatsTreeBuilder(dispatchableSubPlan.getQueryStageList(), list).jsonStatsByStage(0));
            for (MultiStageQueryStats.StageStats.Closed closed : list) {
                if (closed != null) {
                    closed.forEach((type, statMap) -> {
                        type.mergeInto(brokerResponseNativeV2, statMap);
                    });
                }
            }
        } catch (Exception e) {
            LOGGER.warn("Error encountered while collecting multi-stage stats", e);
            brokerResponseNativeV2.setStageStats(JsonNodeFactory.instance.objectNode().put("error", "Error encountered while collecting multi-stage stats - " + String.valueOf(e)));
        }
    }

    private TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> set, RequestContext requestContext, HttpHeaders httpHeaders) {
        long nanoTime = System.nanoTime();
        AccessControl create = this._accessControlFactory.create();
        TableAuthorizationResult authorize = create.authorize(requesterIdentity, set);
        Set set2 = (Set) set.stream().filter(str -> {
            return !create.hasAccess(httpHeaders, TargetType.TABLE, str, "Query");
        }).collect(Collectors.toSet());
        set2.addAll(authorize.getFailedTables());
        TableAuthorizationResult tableAuthorizationResult = !set2.isEmpty() ? new TableAuthorizationResult(set2) : TableAuthorizationResult.success();
        if (!tableAuthorizationResult.hasAccess()) {
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1L);
            LOGGER.warn("Access denied for requestId {}", Long.valueOf(requestContext.getRequestId()));
            requestContext.setErrorCode(180);
        }
        updatePhaseTimingForTables(set, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - nanoTime);
        return tableAuthorizationResult;
    }

    private boolean hasExceededQPSQuota(@Nullable String str, Set<String> set, RequestContext requestContext) {
        if (str != null && !this._queryQuotaManager.acquireDatabase(str)) {
            LOGGER.warn("Request {}: query exceeds quota for database: {}", Long.valueOf(requestContext.getRequestId()), str);
            requestContext.setErrorCode(429);
            return true;
        }
        for (String str2 : set) {
            if (!this._queryQuotaManager.acquire(str2)) {
                LOGGER.warn("Request {}: query exceeds quota for table: {}", Long.valueOf(requestContext.getRequestId()), str2);
                requestContext.setErrorCode(429);
                this._brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(str2), BrokerMeter.QUERY_QUOTA_EXCEEDED, 1L);
                return true;
            }
        }
        return false;
    }

    private void updatePhaseTimingForTables(Set<String> set, BrokerQueryPhase brokerQueryPhase, long j) {
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            this._brokerMetrics.addPhaseTiming(TableNameBuilder.extractRawTableName(it.next()), brokerQueryPhase, j);
        }
    }

    private BrokerResponse constructMultistageExplainPlan(String str, String str2) {
        BrokerResponseNative empty = BrokerResponseNative.empty();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{str, str2});
        empty.setResultTable(new ResultTable(new DataSchema(new String[]{"SQL", "PLAN"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING}), arrayList));
        return empty;
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public Map<Long, String> getRunningQueries() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public boolean cancelQuery(long j, int i, Executor executor, HttpClientConnectionManager httpClientConnectionManager, Map<String, Integer> map) {
        throw new UnsupportedOperationException();
    }

    private static String toSizeLimitedString(Set<String> set, int i) {
        return (String) set.stream().limit(i).collect(Collectors.joining(", ", "[", set.size() > i ? "...]" : "]"));
    }
}
