package org.apache.pinot.controller.api.resources;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.type.TypeFactory;
import org.apache.pinot.query.type.TypeSystem;
import org.apache.pinot.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.apache.pinot.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlCompilationException;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/")
/* loaded from: input_file:org/apache/pinot/controller/api/resources/PinotQueryResource.class */
public class PinotQueryResource {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PinotQueryResource.class);
    private static final Random RANDOM = new Random();

    @Inject
    SqlQueryExecutor _sqlQueryExecutor;

    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;

    @Inject
    AccessControlFactory _accessControlFactory;

    @Inject
    ControllerConf _controllerConf;

    @POST
    @ManualAuthorization
    @Path("sql")
    public String handlePostSql(String str, @Context HttpHeaders httpHeaders) {
        try {
            JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(str);
            String asText = stringToJsonNode.get("sql").asText();
            String jsonNode = stringToJsonNode.has(CommonConstants.Broker.Request.TRACE) ? stringToJsonNode.get(CommonConstants.Broker.Request.TRACE).toString() : "false";
            String str2 = null;
            if (stringToJsonNode.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
                str2 = stringToJsonNode.get(CommonConstants.Broker.Request.QUERY_OPTIONS).asText();
            }
            LOGGER.debug("Trace: {}, Running query: {}", jsonNode, asText);
            return executeSqlQuery(httpHeaders, asText, jsonNode, str2, "/sql");
        } catch (WebApplicationException e) {
            LOGGER.error("Caught exception while processing post request", (Throwable) e);
            throw e;
        } catch (ProcessingException e2) {
            LOGGER.error("Caught exception while processing post request {}", e2.getMessage());
            return e2.getMessage();
        } catch (Exception e3) {
            LOGGER.error("Caught exception while processing post request", (Throwable) e3);
            return QueryException.getException(QueryException.INTERNAL_ERROR, e3).toString();
        }
    }

    @GET
    @ManualAuthorization
    @Path("sql")
    public String handleGetSql(@QueryParam("sql") String str, @QueryParam("trace") String str2, @QueryParam("queryOptions") String str3, @Context HttpHeaders httpHeaders) {
        try {
            LOGGER.debug("Trace: {}, Running query: {}", str2, str);
            return executeSqlQuery(httpHeaders, str, str2, str3, "/sql");
        } catch (WebApplicationException e) {
            LOGGER.error("Caught exception while processing get request", (Throwable) e);
            throw e;
        } catch (ProcessingException e2) {
            LOGGER.error("Caught exception while processing get request {}", e2.getMessage());
            return e2.getMessage();
        } catch (Exception e3) {
            LOGGER.error("Caught exception while processing get request", (Throwable) e3);
            return QueryException.getException(QueryException.INTERNAL_ERROR, e3).toString();
        }
    }

    private String executeSqlQuery(@Context HttpHeaders httpHeaders, String str, String str2, @Nullable String str3, String str4) throws Exception {
        try {
            SqlNodeAndOptions compileToSqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(str);
            Map<String, String> options = compileToSqlNodeAndOptions.getOptions();
            if (str3 != null) {
                compileToSqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromString(str3));
            }
            if (Boolean.parseBoolean(options.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
                if (this._controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true)) {
                    return getMultiStageQueryResponse(str, str3, httpHeaders, str4, str2);
                }
                throw new UnsupportedOperationException("V2 Multi-Stage query engine not enabled. Please see https://docs.pinot.apache.org/ for instruction to enable V2 engine.");
            }
            PinotSqlType sqlType = compileToSqlNodeAndOptions.getSqlType();
            switch (sqlType) {
                case DQL:
                    return getQueryResponse(str, compileToSqlNodeAndOptions.getSqlNode(), str2, str3, httpHeaders);
                case DML:
                    return this._sqlQueryExecutor.executeDMLStatement(compileToSqlNodeAndOptions, (Map) httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> {
                        return !((List) entry.getValue()).isEmpty();
                    }).map(entry2 -> {
                        return Pair.of((String) entry2.getKey(), (String) ((List) entry2.getValue()).get(0));
                    }).collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, (v0) -> {
                        return v0.getValue();
                    }))).toJsonString();
                default:
                    throw new UnsupportedOperationException("Unsupported SQL type - " + sqlType);
            }
        } catch (SqlCompilationException e) {
            throw QueryException.getException(QueryException.SQL_PARSING_ERROR, e);
        }
    }

    private String getMultiStageQueryResponse(String str, String str2, HttpHeaders httpHeaders, String str3, String str4) {
        List<String> allBrokerInstances;
        if (!this._accessControlFactory.create().hasAccess((String) null, AccessType.READ, httpHeaders, str3)) {
            throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
        }
        List<String> tableNamesForQuery = new QueryEnvironment(new TypeFactory(new TypeSystem()), CalciteSchemaBuilder.asRootSchema(new PinotCatalog(this._pinotHelixResourceManager.getTableCache())), null, null).getTableNamesForQuery(str);
        if (tableNamesForQuery.size() != 0) {
            List<TableConfig> listTableConfigs = getListTableConfigs(tableNamesForQuery);
            if (listTableConfigs == null || listTableConfigs.size() == 0) {
                return QueryException.getException(QueryException.TABLE_DOES_NOT_EXIST_ERROR, new Exception("Unable to find table in cluster, table does not exist")).toString();
            }
            Set<String> brokerTenantsUnion = getBrokerTenantsUnion(listTableConfigs);
            if (brokerTenantsUnion.isEmpty()) {
                return QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR, new Exception(String.format("Unable to dispatch multistage query for tables: [%s]", tableNamesForQuery))).toString();
            }
            allBrokerInstances = findCommonBrokerInstances(brokerTenantsUnion);
        } else {
            allBrokerInstances = this._pinotHelixResourceManager.getAllBrokerInstances();
            LOGGER.error("Unable to find table name from SQL {} thus dispatching to random broker.", str);
        }
        return sendRequestToBroker(str, selectRandomInstanceId(allBrokerInstances), str4, str2, httpHeaders);
    }

    private String getQueryResponse(String str, @Nullable SqlNode sqlNode, String str2, String str3, HttpHeaders httpHeaders) {
        try {
            String extractRawTableName = TableNameBuilder.extractRawTableName(this._pinotHelixResourceManager.getActualTableName(sqlNode != null ? RequestUtils.getTableNames(CalciteSqlParser.compileSqlNodeToPinotQuery(sqlNode)).iterator().next() : CalciteSqlCompiler.compileToBrokerRequest(str).getQuerySource().getTableName()));
            return !this._accessControlFactory.create().hasDataAccess(httpHeaders, extractRawTableName) ? QueryException.ACCESS_DENIED_ERROR.toString() : sendRequestToBroker(str, selectRandomInstanceId(this._pinotHelixResourceManager.getBrokerInstancesFor(extractRawTableName)), str2, str3, httpHeaders);
        } catch (Exception e) {
            LOGGER.error("Caught exception while compiling query: {}", str, e);
            try {
                LOGGER.info("Trying to compile query {} using multi-stage engine", str);
                new QueryEnvironment(new TypeFactory(new TypeSystem()), CalciteSchemaBuilder.asRootSchema(new PinotCatalog(this._pinotHelixResourceManager.getTableCache())), null, null).getTableNamesForQuery(str);
                LOGGER.info("Successfully compiled query using multi-stage engine: {}", str);
                return QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception("It seems that the query is only supported by the multi-stage engine, please try it by checking the \"Use Multi-Stage Engine\" box above")).toString();
            } catch (Exception e2) {
                LOGGER.error("Caught exception while compiling query using multi-stage engine: {}", str, e2);
                return QueryException.getException(QueryException.SQL_PARSING_ERROR, e).toString();
            }
        }
    }

    private List<TableConfig> getListTableConfigs(List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (String str : list) {
            ArrayList arrayList2 = new ArrayList();
            if (this._pinotHelixResourceManager.hasRealtimeTable(str)) {
                arrayList2.add((TableConfig) Objects.requireNonNull(this._pinotHelixResourceManager.getRealtimeTableConfig(str)));
            }
            if (this._pinotHelixResourceManager.hasOfflineTable(str)) {
                arrayList2.add((TableConfig) Objects.requireNonNull(this._pinotHelixResourceManager.getOfflineTableConfig(str)));
            }
            if (arrayList2.size() == 0) {
                return null;
            }
            arrayList.addAll(arrayList2);
        }
        return arrayList;
    }

    private String selectRandomInstanceId(List<String> list) {
        if (list.isEmpty()) {
            return QueryException.BROKER_RESOURCE_MISSING_ERROR.toString();
        }
        list.retainAll(this._pinotHelixResourceManager.getOnlineInstanceList());
        return list.isEmpty() ? QueryException.BROKER_INSTANCE_MISSING_ERROR.toString() : list.get(RANDOM.nextInt(list.size()));
    }

    private List<String> findCommonBrokerInstances(Set<String> set) {
        Stream<InstanceConfig> stream = this._pinotHelixResourceManager.getAllBrokerInstanceConfigs().stream();
        for (String str : set) {
            stream = stream.filter(instanceConfig -> {
                return instanceConfig.containsTag(TagNameUtils.getBrokerTagForTenant(str));
            });
        }
        return (List) stream.map((v0) -> {
            return v0.getInstanceName();
        }).collect(Collectors.toList());
    }

    private Set<String> getBrokerTenantsUnion(List<TableConfig> list) {
        HashSet hashSet = new HashSet();
        Iterator<TableConfig> it2 = list.iterator();
        while (it2.hasNext()) {
            hashSet.add(it2.next().getTenantConfig().getBroker());
        }
        return hashSet;
    }

    private String sendRequestToBroker(String str, String str2, String str3, String str4, HttpHeaders httpHeaders) {
        InstanceConfig helixInstanceConfig = this._pinotHelixResourceManager.getHelixInstanceConfig(str2);
        if (helixInstanceConfig == null) {
            LOGGER.error("Instance {} not found", str2);
            return QueryException.INTERNAL_ERROR.toString();
        }
        String hostName = helixInstanceConfig.getHostName();
        if (hostName.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
            hostName = hostName.substring(CommonConstants.Helix.BROKER_INSTANCE_PREFIX_LENGTH);
        }
        return sendRequestRaw(getQueryURL(this._controllerConf.getControllerBrokerProtocol(), hostName, this._controllerConf.getControllerBrokerPortOverride() > 0 ? this._controllerConf.getControllerBrokerPortOverride() : Integer.parseInt(helixInstanceConfig.getPort())), str, getRequestJson(str, str3, str4), (Map) httpHeaders.getRequestHeaders().entrySet().stream().filter(entry -> {
            return !((List) entry.getValue()).isEmpty();
        }).map(entry2 -> {
            return Pair.of((String) entry2.getKey(), (String) ((List) entry2.getValue()).get(0));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private ObjectNode getRequestJson(String str, String str2, String str3) {
        ObjectNode newObjectNode = JsonUtils.newObjectNode();
        newObjectNode.put("sql", str);
        if (str2 != null && !str2.isEmpty()) {
            newObjectNode.put(CommonConstants.Broker.Request.TRACE, str2);
        }
        if (str3 != null && !str3.isEmpty()) {
            newObjectNode.put(CommonConstants.Broker.Request.QUERY_OPTIONS, str3);
        }
        return newObjectNode;
    }

    private String getQueryURL(String str, String str2, int i) {
        return String.format("%s://%s:%d/query/sql", str, str2, Integer.valueOf(i));
    }

    public String sendPostRaw(String str, String str2, Map<String, String> map) {
        HttpURLConnection httpURLConnection = null;
        try {
            try {
                LOGGER.info("url string passed is : " + str);
                HttpURLConnection httpURLConnection2 = (HttpURLConnection) new URL(str).openConnection();
                httpURLConnection2.setDoOutput(true);
                httpURLConnection2.setRequestMethod("POST");
                httpURLConnection2.setRequestProperty("Accept-Encoding", "gzip");
                byte[] bytes = str2.getBytes(StandardCharsets.UTF_8);
                httpURLConnection2.setRequestProperty("Content-Length", String.valueOf(bytes.length));
                httpURLConnection2.setRequestProperty("http.keepAlive", String.valueOf(true));
                httpURLConnection2.setRequestProperty("default", String.valueOf(true));
                if (map != null && !map.isEmpty()) {
                    for (Map.Entry<String, String> entry : map.entrySet()) {
                        httpURLConnection2.setRequestProperty(entry.getKey(), entry.getValue());
                    }
                }
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(httpURLConnection2.getOutputStream());
                bufferedOutputStream.write(bytes);
                bufferedOutputStream.flush();
                bufferedOutputStream.close();
                int responseCode = httpURLConnection2.getResponseCode();
                if (responseCode == 403) {
                    throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
                }
                if (responseCode != 200) {
                    InputStream errorStream = httpURLConnection2.getErrorStream();
                    throw new IOException("Failed : HTTP error code : " + responseCode + ". Root Cause: " + (errorStream != null ? IOUtils.toString(errorStream, StandardCharsets.UTF_8) : "Unknown"));
                }
                String str3 = new String(drain(new BufferedInputStream(httpURLConnection2.getInputStream())), StandardCharsets.UTF_8);
                if (httpURLConnection2 != null) {
                    httpURLConnection2.disconnect();
                }
                return str3;
            } catch (Exception e) {
                LOGGER.error("Caught exception while sending query request", (Throwable) e);
                Utils.rethrowException(e);
                throw new AssertionError("Should not reach this");
            }
        } catch (Throwable th) {
            if (0 != 0) {
                httpURLConnection.disconnect();
            }
            throw th;
        }
    }

    byte[] drain(InputStream inputStream) throws IOException {
        try {
            byte[] bArr = new byte[1024];
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while (true) {
                int read = inputStream.read(bArr);
                if (read <= 0) {
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    inputStream.close();
                    return byteArray;
                }
                byteArrayOutputStream.write(bArr, 0, read);
            }
        } catch (Throwable th) {
            inputStream.close();
            throw th;
        }
    }

    public String sendRequestRaw(String str, String str2, ObjectNode objectNode, Map<String, String> map) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            String sendPostRaw = sendPostRaw(str, objectNode.toString(), map);
            LOGGER.info("Query: " + str2 + " Time: " + (System.currentTimeMillis() - currentTimeMillis));
            return sendPostRaw;
        } catch (Exception e) {
            LOGGER.error("Caught exception in sendQueryRaw", (Throwable) e);
            Utils.rethrowException(e);
            throw new AssertionError("Should not reach this");
        }
    }
}
