package org.apache.pinot.broker.requesthandler;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.query.service.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.class */
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MultiStageBrokerRequestHandler.class);
    private static final long DEFAULT_TIMEOUT_NANO = 10000000000L;
    private final String _reducerHostname;
    private final int _reducerPort;
    private final MailboxService<Mailbox.MailboxContent> _mailboxService;
    private final QueryEnvironment _queryEnvironment;
    private final QueryDispatcher _queryDispatcher;

    public MultiStageBrokerRequestHandler(PinotConfiguration pinotConfiguration, BrokerRoutingManager brokerRoutingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) {
        super(pinotConfiguration, brokerRoutingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
        LOGGER.info("Using Multi-stage BrokerRequestHandler.");
        String property = pinotConfiguration.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
        if (property == null) {
            String property2 = pinotConfiguration.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
            String substring = property2.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? property2.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : property2;
            property = StringUtils.split(substring, "_").length > 1 ? StringUtils.split(substring, "_")[0] : substring;
        }
        this._reducerHostname = property;
        this._reducerPort = pinotConfiguration.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 0);
        this._queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()), CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)), new WorkerManager(this._reducerHostname, this._reducerPort, brokerRoutingManager));
        this._queryDispatcher = new QueryDispatcher();
        this._mailboxService = new GrpcMailboxService(this._reducerHostname, this._reducerPort, pinotConfiguration);
        this._mailboxService.start();
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler, org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public BrokerResponse handleRequest(JsonNode jsonNode, @Nullable SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception {
        long incrementAndGet = this._requestIdGenerator.incrementAndGet();
        requestContext.setBrokerId(this._brokerId);
        requestContext.setRequestId(incrementAndGet);
        requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
        if (!this._accessControlFactory.create().hasAccess(requesterIdentity)) {
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1L);
            LOGGER.info("Access denied for requestId {}", Long.valueOf(incrementAndGet));
            requestContext.setErrorCode(180);
            return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
        }
        JsonNode jsonNode2 = jsonNode.get("sql");
        if (jsonNode2 == null) {
            throw new BadQueryRequestException("Failed to find 'sql' in the request: " + jsonNode);
        }
        String asText = jsonNode2.asText();
        requestContext.setQuery(asText);
        return handleRequest(incrementAndGet, asText, sqlNodeAndOptions, jsonNode, requesterIdentity, requestContext);
    }

    private BrokerResponseNative handleRequest(long j, String str, @Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode jsonNode, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception {
        SqlNodeAndOptions parseQuery;
        LOGGER.debug("SQL query for request {}: {}", Long.valueOf(j), str);
        if (sqlNodeAndOptions != null) {
            parseQuery = sqlNodeAndOptions;
        } else {
            try {
                parseQuery = RequestUtils.parseQuery(str, jsonNode);
            } catch (Exception e) {
                LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", Long.valueOf(j), str, e.getMessage());
                this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1L);
                requestContext.setErrorCode(150);
                return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
            }
        }
        SqlNodeAndOptions sqlNodeAndOptions2 = parseQuery;
        long nanoTime = System.nanoTime();
        switch (sqlNodeAndOptions2.getSqlNode().getKind()) {
            case EXPLAIN:
                return constructMultistageExplainPlan(str, this._queryEnvironment.explainQuery(str, sqlNodeAndOptions2));
            case SELECT:
            default:
                try {
                    List<DataTable> submitAndReduce = this._queryDispatcher.submitAndReduce(j, this._queryEnvironment.planQuery(str, sqlNodeAndOptions2), this._mailboxService, 10000000000L);
                    BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
                    long millis = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions2.getParseTimeNs() + (System.nanoTime() - nanoTime));
                    brokerResponseNative.setTimeUsedMs(millis);
                    brokerResponseNative.setResultTable(toResultTable(submitAndReduce));
                    requestContext.setQueryProcessingTime(millis);
                    augmentStatistics(requestContext, brokerResponseNative);
                    return brokerResponseNative;
                } catch (Exception e2) {
                    LOGGER.info("query execution failed", (Throwable) e2);
                    return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e2));
                }
        }
    }

    private BrokerResponseNative constructMultistageExplainPlan(String str, String str2) {
        BrokerResponseNative empty = BrokerResponseNative.empty();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Object[]{str, str2});
        empty.setResultTable(new ResultTable(new DataSchema(new String[]{"SQL", AvaticaConnection.PLAN_COLUMN_NAME}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING}), arrayList));
        return empty;
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler
    protected BrokerResponseNative processBrokerRequest(long j, BrokerRequest brokerRequest, BrokerRequest brokerRequest2, @Nullable BrokerRequest brokerRequest3, @Nullable Map<ServerInstance, List<String>> map, @Nullable BrokerRequest brokerRequest4, @Nullable Map<ServerInstance, List<String>> map2, long j2, BaseBrokerRequestHandler.ServerStats serverStats, RequestContext requestContext) throws Exception {
        throw new UnsupportedOperationException();
    }

    private ResultTable toResultTable(List<DataTable> list) {
        DataSchema dataSchema = null;
        ArrayList arrayList = new ArrayList();
        for (DataTable dataTable : list) {
            dataSchema = dataSchema == null ? dataTable.getDataSchema() : dataSchema;
            int length = dataSchema.getColumnNames().length;
            DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
            ArrayList arrayList2 = new ArrayList(dataTable.getNumberOfRows());
            for (int i = 0; i < dataTable.getNumberOfRows(); i++) {
                Object[] objArr = new Object[length];
                Object[] extractRowFromDataTable = SelectionOperatorUtils.extractRowFromDataTable(dataTable, i);
                for (int i2 = 0; i2 < length; i2++) {
                    objArr[i2] = columnDataTypes[i2].convertAndFormat(extractRowFromDataTable[i2]);
                }
                arrayList2.add(objArr);
            }
            arrayList.addAll(arrayList2);
        }
        return new ResultTable(dataSchema, arrayList);
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void start() {
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void shutDown() {
        this._queryDispatcher.shutdown();
        this._mailboxService.shutdown();
    }
}
