package org.apache.pinot.broker.api.resources;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ArrayListMultimap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.api.resources.Constants;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
import org.mortbay.jetty.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags = {Constants.QUERY_TAG}, authorizations = {@Authorization(CommonConstants.SWAGGER_AUTHORIZATION_KEY)})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {@ApiKeyAuthDefinition(name = "Authorization", in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.SWAGGER_AUTHORIZATION_KEY)}))
@Path("/")
/* loaded from: input_file:org/apache/pinot/broker/api/resources/PinotClientRequest.class */
public class PinotClientRequest {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PinotClientRequest.class);

    @Inject
    SqlQueryExecutor _sqlQueryExecutor;

    @Inject
    private BrokerRequestHandler _requestHandler;

    @Inject
    private BrokerMetrics _brokerMetrics;

    @Inject
    private Executor _executor;

    @Inject
    private HttpConnectionManager _httpConnMgr;

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = HttpStatus.Internal_Server_Error)})
    @Path("query/sql")
    @ManagedAsync
    @ApiOperation("Querying pinot")
    @Produces({"application/json"})
    public void processSqlQueryGet(@QueryParam("sql") @ApiParam(value = "Query", required = true) String str, @QueryParam("trace") @ApiParam("Trace enabled") String str2, @QueryParam("debugOptions") @ApiParam("Debug options") String str3, @Suspended AsyncResponse asyncResponse, @Context Request request) {
        try {
            ObjectNode newObjectNode = JsonUtils.newObjectNode();
            newObjectNode.put("sql", str);
            if (str2 != null) {
                newObjectNode.put(CommonConstants.Broker.Request.TRACE, str2);
            }
            if (str3 != null) {
                newObjectNode.put(CommonConstants.Broker.Request.DEBUG_OPTIONS, str3);
            }
            asyncResponse.resume(executeSqlQuery(newObjectNode, makeHttpIdentity(request), true).toJsonString());
        } catch (Exception e) {
            LOGGER.error("Caught exception while processing GET request", (Throwable) e);
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
            asyncResponse.resume((Throwable) new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR));
        }
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Query response"), @ApiResponse(code = 500, message = HttpStatus.Internal_Server_Error)})
    @Path("query/sql")
    @ManagedAsync
    @ApiOperation("Querying pinot")
    @POST
    @Produces({"application/json"})
    public void processSqlQueryPost(String str, @Suspended AsyncResponse asyncResponse, @Context Request request) {
        try {
            JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(str);
            if (!stringToJsonNode.has("sql")) {
                throw new IllegalStateException("Payload is missing the query string field 'sql'");
            }
            asyncResponse.resume(executeSqlQuery((ObjectNode) stringToJsonNode, makeHttpIdentity(request), false).toJsonString());
        } catch (Exception e) {
            LOGGER.error("Caught exception while processing POST request", (Throwable) e);
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
            asyncResponse.resume((Throwable) new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR));
        }
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Query not found on the requested broker")})
    @Path("query/{queryId}")
    @DELETE
    @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the given queryId on the requested broker. Query may continue to run for a short while after calling cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.")
    @Produces({"application/json"})
    public String cancelQuery(@PathParam("queryId") @ApiParam(value = "QueryId as assigned by the broker", required = true) long j, @QueryParam("timeoutMs") @ApiParam("Timeout for servers to respond the cancel request") @DefaultValue("3000") int i, @QueryParam("verbose") @ApiParam("Return server responses for troubleshooting") @DefaultValue("false") boolean z) {
        HashMap hashMap;
        if (z) {
            try {
                hashMap = new HashMap();
            } catch (Exception e) {
                throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(String.format("Failed to cancel query: %s on the broker due to error: %s", Long.valueOf(j), e.getMessage())).build());
            }
        } else {
            hashMap = null;
        }
        HashMap hashMap2 = hashMap;
        if (!this._requestHandler.cancelQuery(j, i, this._executor, this._httpConnMgr, hashMap2)) {
            throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", Long.valueOf(j))).build());
        }
        String str = "Cancelled query: " + j;
        if (z) {
            str = str + " with responses from servers: " + hashMap2;
        }
        return str;
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("queries")
    @ApiOperation(value = "Get running queries submitted via the requested broker", notes = "The id is assigned by the requested broker and only unique at the scope of this broker")
    @Produces({"application/json"})
    public Map<Long, String> getRunningQueries() {
        try {
            return this._requestHandler.getRunningQueries();
        } catch (Exception e) {
            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Failed to get running queries on the broker due to error: " + e.getMessage()).build());
        }
    }

    private BrokerResponse executeSqlQuery(ObjectNode objectNode, HttpRequesterIdentity httpRequesterIdentity, boolean z) throws Exception {
        try {
            SqlNodeAndOptions parseQuery = RequestUtils.parseQuery(objectNode.get("sql").asText(), objectNode);
            PinotSqlType sqlType = parseQuery.getSqlType();
            if (z && sqlType != PinotSqlType.DQL) {
                return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL.")));
            }
            switch (sqlType) {
                case DQL:
                    RequestScope createRequestScope = Tracing.getTracer().createRequestScope();
                    try {
                        BrokerResponse handleRequest = this._requestHandler.handleRequest(objectNode, parseQuery, httpRequesterIdentity, createRequestScope);
                        if (createRequestScope != null) {
                            createRequestScope.close();
                        }
                        return handleRequest;
                    } catch (Throwable th) {
                        if (createRequestScope != null) {
                            try {
                                createRequestScope.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                case DML:
                    HashMap hashMap = new HashMap();
                    httpRequesterIdentity.getHttpHeaders().entries().forEach(entry -> {
                        hashMap.put((String) entry.getKey(), (String) entry.getValue());
                    });
                    return this._sqlQueryExecutor.executeDMLStatement(parseQuery, hashMap);
                default:
                    return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new UnsupportedOperationException("Unsupported SQL type - " + sqlType)));
            }
        } catch (Exception e) {
            return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
        }
    }

    private static HttpRequesterIdentity makeHttpIdentity(Request request) {
        ArrayListMultimap create = ArrayListMultimap.create();
        request.getHeaderNames().forEach(str -> {
            request.getHeaders(str).forEach(str -> {
                create.put(str, str);
            });
        });
        HttpRequesterIdentity httpRequesterIdentity = new HttpRequesterIdentity();
        httpRequesterIdentity.setHttpHeaders(create);
        httpRequesterIdentity.setEndpointUrl(request.getRequestURL().toString());
        return httpRequesterIdentity;
    }
}
