package org.apache.pinot.controller.api.resources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/")
/* loaded from: input_file:org/apache/pinot/controller/api/resources/PinotQueryResource.class */
public class PinotQueryResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotQueryResource.class);

    @Inject
    SqlQueryExecutor _sqlQueryExecutor;

    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;

    @Inject
    AccessControlFactory _accessControlFactory;

    @Inject
    ControllerConf _controllerConf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pinot.controller.api.resources.PinotQueryResource$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pinot/controller/api/resources/PinotQueryResource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pinot$sql$parsers$PinotSqlType = new int[PinotSqlType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pinot$sql$parsers$PinotSqlType[PinotSqlType.DQL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pinot$sql$parsers$PinotSqlType[PinotSqlType.DML.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @POST
    @ManualAuthorization
    @Path(RecommenderConstants.SQL)
    public StreamingOutput handlePostSql(String str, @Context HttpHeaders httpHeaders) {
        try {
            JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(str);
            if (!stringToJsonNode.has(RecommenderConstants.SQL)) {
                return constructQueryExceptionResponse(QueryErrorCode.JSON_PARSING, "JSON Payload is missing the query string field 'sql'");
            }
            String asText = stringToJsonNode.get(RecommenderConstants.SQL).asText();
            String jsonNode = stringToJsonNode.has("trace") ? stringToJsonNode.get("trace").toString() : "false";
            String str2 = null;
            if (stringToJsonNode.has("queryOptions")) {
                str2 = stringToJsonNode.get("queryOptions").asText();
            }
            return executeSqlQueryCatching(httpHeaders, asText, jsonNode, str2);
        } catch (Exception e) {
            return constructQueryExceptionResponse(QueryErrorCode.JSON_PARSING, e.getMessage());
        }
    }

    @GET
    @ManualAuthorization
    @Path(RecommenderConstants.SQL)
    public StreamingOutput handleGetSql(@QueryParam("sql") String str, @QueryParam("trace") String str2, @QueryParam("queryOptions") String str3, @Context HttpHeaders httpHeaders) {
        return executeSqlQueryCatching(httpHeaders, str, str2, str3);
    }

    private StreamingOutput executeSqlQueryCatching(HttpHeaders httpHeaders, String str, String str2, String str3) {
        try {
            return executeSqlQuery(httpHeaders, str, str2, str3);
        } catch (ProcessingException e) {
            LOGGER.error("Caught exception while processing get request {}", e.getMessage());
            return constructQueryExceptionResponse(QueryErrorCode.fromErrorCode(e.getErrorCode()), e.getMessage());
        } catch (QueryException e2) {
            LOGGER.warn("Caught exception while processing get request {}", e2.getMessage());
            return constructQueryExceptionResponse(e2.getErrorCode(), e2.getMessage());
        } catch (Exception e3) {
            LOGGER.error("Caught unknown exception while processing get request", e3);
            return constructQueryExceptionResponse(QueryErrorCode.INTERNAL, e3.getMessage());
        } catch (WebApplicationException e4) {
            LOGGER.error("Caught exception while processing get request", e4);
            throw e4;
        }
    }

    private StreamingOutput executeSqlQuery(@Context HttpHeaders httpHeaders, String str, String str2, @Nullable String str3) throws Exception {
        LOGGER.debug("Trace: {}, Running query: {}", str2, str);
        SqlNodeAndOptions compileToSqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(str);
        Map options = compileToSqlNodeAndOptions.getOptions();
        if (str3 != null) {
            compileToSqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromString(str3));
        }
        boolean parseBoolean = Boolean.parseBoolean((String) options.get("useMultistageEngine"));
        boolean property = this._controllerConf.getProperty("pinot.multistage.engine.enabled", true);
        if (parseBoolean && !property) {
            throw QueryErrorCode.INTERNAL.asException("V2 Multi-Stage query engine not enabled.");
        }
        PinotSqlType sqlType = compileToSqlNodeAndOptions.getSqlType();
        switch (AnonymousClass1.$SwitchMap$org$apache$pinot$sql$parsers$PinotSqlType[sqlType.ordinal()]) {
            case 1:
                return parseBoolean ? getMultiStageQueryResponse(str, str3, httpHeaders, str2) : getQueryResponse(str, compileToSqlNodeAndOptions.getSqlNode(), str2, str3, httpHeaders);
            case 2:
                Map map = (Map) httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> {
                    return !((List) entry.getValue()).isEmpty();
                }).map(entry2 -> {
                    return Pair.of((String) entry2.getKey(), (String) ((List) entry2.getValue()).get(0));
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                return outputStream -> {
                    try {
                        this._sqlQueryExecutor.executeDMLStatement(compileToSqlNodeAndOptions, map).toOutputStream(outputStream);
                        if (outputStream != null) {
                            outputStream.close();
                        }
                    } catch (Throwable th) {
                        if (outputStream != null) {
                            try {
                                outputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                };
            default:
                throw QueryErrorCode.INTERNAL.asException("Unsupported SQL type - " + String.valueOf(sqlType));
        }
    }

    private StreamingOutput getMultiStageQueryResponse(String str, String str2, HttpHeaders httpHeaders, String str3) {
        if (!this._accessControlFactory.create().hasAccess(AccessType.READ, httpHeaders, "/sql")) {
            throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
        }
        Map options = RequestUtils.parseQuery(str).getOptions();
        if (str2 != null) {
            options.putAll(RequestUtils.getOptionsFromString(str2));
        }
        String extractDatabaseFromQueryRequest = DatabaseUtils.extractDatabaseFromQueryRequest(options, httpHeaders);
        return sendRequestToBroker(str, selectRandomInstanceId(getInstanceIds(str, getTableNames(str, extractDatabaseFromQueryRequest), extractDatabaseFromQueryRequest)), str3, str2, httpHeaders);
    }

    private List<String> getTableNames(String str, String str2) {
        try {
            QueryEnvironment.CompiledQuery compile = new QueryEnvironment(str2, this._pinotHelixResourceManager.getTableCache(), (WorkerManager) null).compile(str);
            try {
                ArrayList arrayList = new ArrayList(compile.getTableNames());
                if (compile != null) {
                    compile.close();
                }
                return arrayList;
            } catch (Throwable th) {
                if (compile != null) {
                    try {
                        compile.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            throw QueryErrorCode.SQL_PARSING.asException("Unable to find table for this query", e);
        } catch (QueryException e2) {
            if (e2.getErrorCode() != QueryErrorCode.UNKNOWN) {
                throw e2;
            }
            throw new QueryException(QueryErrorCode.SQL_PARSING, e2);
        }
    }

    private List<String> getInstanceIds(String str, List<String> list, String str2) {
        List<String> allBrokerInstances;
        if (list.isEmpty()) {
            allBrokerInstances = this._pinotHelixResourceManager.getAllBrokerInstances();
            LOGGER.error("Unable to find table name from SQL {} thus dispatching to random broker.", str);
        } else {
            List<TableConfig> listTableConfigs = getListTableConfigs(list, str2);
            if (listTableConfigs == null || listTableConfigs.isEmpty()) {
                throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException("Unable to find table in cluster, table does not exist");
            }
            Set<String> brokerTenantsUnion = getBrokerTenantsUnion(listTableConfigs);
            if (brokerTenantsUnion.isEmpty()) {
                throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find broker tenant for tables: " + String.valueOf(list));
            }
            allBrokerInstances = findCommonBrokerInstances(brokerTenantsUnion);
            if (allBrokerInstances.isEmpty()) {
                LOGGER.error("Unable to find a common broker instance for table tenants. Tables: {}, Tenants: {}", list, brokerTenantsUnion);
                throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("Unable to find a common broker instance for table tenants. Tables: " + String.valueOf(list) + ", Tenants: " + String.valueOf(brokerTenantsUnion));
            }
        }
        return allBrokerInstances;
    }

    private StreamingOutput getQueryResponse(String str, @Nullable SqlNode sqlNode, String str2, String str3, HttpHeaders httpHeaders) {
        Map options = RequestUtils.parseQuery(str).getOptions();
        if (str3 != null) {
            options.putAll(RequestUtils.getOptionsFromString(str3));
        }
        try {
            String extractDatabaseFromQueryRequest = DatabaseUtils.extractDatabaseFromQueryRequest(options, httpHeaders);
            try {
                String extractRawTableName = TableNameBuilder.extractRawTableName(this._pinotHelixResourceManager.getActualTableName(sqlNode != null ? (String) RequestUtils.getTableNames(CalciteSqlParser.compileSqlNodeToPinotQuery(sqlNode)).iterator().next() : CalciteSqlCompiler.compileToBrokerRequest(str).getQuerySource().getTableName(), extractDatabaseFromQueryRequest));
                if (this._accessControlFactory.create().hasAccess(extractRawTableName, AccessType.READ, httpHeaders, Constants.QUERY_TAG)) {
                    return sendRequestToBroker(str, selectRandomInstanceId(this._pinotHelixResourceManager.getBrokerInstancesFor(extractRawTableName)), str2, str3, httpHeaders);
                }
                throw QueryErrorCode.ACCESS_DENIED.asException();
            } catch (Exception e) {
                LOGGER.error("Caught exception while compiling query: {}", str, e);
                if (ParserUtils.canCompileWithMultiStageEngine(str, extractDatabaseFromQueryRequest, this._pinotHelixResourceManager.getTableCache())) {
                    throw QueryErrorCode.SQL_PARSING.asException("It seems that the query is only supported by the multi-stage query engine, please retry the query using the multi-stage query engine (https://docs.pinot.apache.org/developers/advanced/v2-multi-stage-query-engine)");
                }
                throw QueryErrorCode.SQL_PARSING.asException(e);
            }
        } catch (DatabaseConflictException e2) {
            throw QueryErrorCode.QUERY_VALIDATION.asException(e2);
        }
    }

    private List<TableConfig> getListTableConfigs(List<String> list, String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String actualTableName = this._pinotHelixResourceManager.getActualTableName(it.next(), str);
            ArrayList arrayList2 = new ArrayList();
            if (this._pinotHelixResourceManager.hasRealtimeTable(actualTableName)) {
                arrayList2.add((TableConfig) Objects.requireNonNull(this._pinotHelixResourceManager.getRealtimeTableConfig(actualTableName)));
            }
            if (this._pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
                arrayList2.add((TableConfig) Objects.requireNonNull(this._pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
            }
            if (arrayList2.isEmpty()) {
                return null;
            }
            arrayList.addAll(arrayList2);
        }
        return arrayList;
    }

    private String selectRandomInstanceId(List<String> list) {
        if (list.isEmpty()) {
            throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("No broker found for query");
        }
        list.retainAll(this._pinotHelixResourceManager.getOnlineInstanceList());
        if (list.isEmpty()) {
            throw QueryErrorCode.BROKER_INSTANCE_MISSING.asException("No online broker found for query");
        }
        return list.get(ThreadLocalRandom.current().nextInt(list.size()));
    }

    private List<String> findCommonBrokerInstances(Set<String> set) {
        Stream<InstanceConfig> stream = this._pinotHelixResourceManager.getAllBrokerInstanceConfigs().stream();
        for (String str : set) {
            stream = stream.filter(instanceConfig -> {
                return instanceConfig.containsTag(TagNameUtils.getBrokerTagForTenant(str));
            });
        }
        return (List) stream.map((v0) -> {
            return v0.getInstanceName();
        }).collect(Collectors.toList());
    }

    private Set<String> getBrokerTenantsUnion(List<TableConfig> list) {
        HashSet hashSet = new HashSet();
        Iterator<TableConfig> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getTenantConfig().getBroker());
        }
        return hashSet;
    }

    private StreamingOutput sendRequestToBroker(String str, String str2, String str3, String str4, HttpHeaders httpHeaders) {
        InstanceConfig helixInstanceConfig = this._pinotHelixResourceManager.getHelixInstanceConfig(str2);
        if (helixInstanceConfig == null) {
            LOGGER.error("Instance {} not found", str2);
            throw QueryErrorCode.INTERNAL.asException();
        }
        String hostName = helixInstanceConfig.getHostName();
        if (hostName.startsWith("Broker_")) {
            hostName = hostName.substring(CommonConstants.Helix.BROKER_INSTANCE_PREFIX_LENGTH);
        }
        return sendRequestRaw(getQueryURL(this._controllerConf.getControllerBrokerProtocol(), hostName, this._controllerConf.getControllerBrokerPortOverride() > 0 ? this._controllerConf.getControllerBrokerPortOverride() : Integer.parseInt(helixInstanceConfig.getPort())), str, getRequestJson(str, str3, str4), (Map) httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> {
            return !((List) entry.getValue()).isEmpty();
        }).map(entry2 -> {
            return Pair.of((String) entry2.getKey(), (String) ((List) entry2.getValue()).get(0));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private ObjectNode getRequestJson(String str, String str2, String str3) {
        ObjectNode newObjectNode = JsonUtils.newObjectNode();
        newObjectNode.put(RecommenderConstants.SQL, str);
        if (str2 != null && !str2.isEmpty()) {
            newObjectNode.put("trace", str2);
        }
        if (str3 != null && !str3.isEmpty()) {
            newObjectNode.put("queryOptions", str3);
        }
        return newObjectNode;
    }

    private String getQueryURL(String str, String str2, int i) {
        return String.format("%s://%s:%d/query/sql", str, str2, Integer.valueOf(i));
    }

    public void sendPostRaw(String str, String str2, Map<String, String> map, OutputStream outputStream) {
        HttpURLConnection httpURLConnection = null;
        try {
            try {
                LOGGER.info("url string passed is : {}", str);
                HttpURLConnection httpURLConnection2 = (HttpURLConnection) new URL(str).openConnection();
                httpURLConnection2.setDoOutput(true);
                httpURLConnection2.setRequestMethod("POST");
                httpURLConnection2.setRequestProperty("Accept-Encoding", "gzip");
                byte[] bytes = str2.getBytes(StandardCharsets.UTF_8);
                httpURLConnection2.setRequestProperty("Content-Length", String.valueOf(bytes.length));
                httpURLConnection2.setRequestProperty("http.keepAlive", String.valueOf(true));
                httpURLConnection2.setRequestProperty(SegmentCompletionConfig.DEFAULT_FSM_SCHEME, String.valueOf(true));
                if (map != null && !map.isEmpty()) {
                    for (Map.Entry<String, String> entry : map.entrySet()) {
                        httpURLConnection2.setRequestProperty(entry.getKey(), entry.getValue());
                    }
                }
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(httpURLConnection2.getOutputStream());
                try {
                    bufferedOutputStream.write(bytes);
                    bufferedOutputStream.flush();
                    bufferedOutputStream.close();
                    int responseCode = httpURLConnection2.getResponseCode();
                    if (responseCode == 403) {
                        throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
                    }
                    if (responseCode != 200) {
                        InputStream errorStream = httpURLConnection2.getErrorStream();
                        throw new IOException("Failed : HTTP error code : " + responseCode + ". Root Cause: " + (errorStream != null ? IOUtils.toString(errorStream, StandardCharsets.UTF_8) : "Unknown"));
                    }
                    IOUtils.copy(httpURLConnection2.getInputStream(), outputStream);
                    if (httpURLConnection2 != null) {
                        httpURLConnection2.disconnect();
                    }
                } catch (Throwable th) {
                    try {
                        bufferedOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (Exception e) {
                LOGGER.error("Caught exception while sending query request", e);
                Utils.rethrowException(e);
                throw new AssertionError("Should not reach this");
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                httpURLConnection.disconnect();
            }
            throw th3;
        }
    }

    public StreamingOutput sendRequestRaw(String str, String str2, ObjectNode objectNode, Map<String, String> map) {
        return outputStream -> {
            long currentTimeMillis = System.currentTimeMillis();
            sendPostRaw(str, objectNode.toString(), map, outputStream);
            LOGGER.info("Query: {} Time: {}", str2, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        };
    }

    private static StreamingOutput constructQueryExceptionResponse(QueryErrorCode queryErrorCode, String str) {
        return outputStream -> {
            try {
                new BrokerResponseNative(queryErrorCode, str).toOutputStream(outputStream);
                if (outputStream != null) {
                    outputStream.close();
                }
            } catch (Throwable th) {
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        };
    }

    private static StreamingOutput constructQueryExceptionResponse(QueryErrorMessage queryErrorMessage) {
        return constructQueryExceptionResponse(queryErrorMessage.getErrCode(), queryErrorMessage.getUsrMsg());
    }
}
