package org.apache.pinot.controller.api.resources;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.http.MultiHttpRequest;
import org.apache.pinot.common.http.MultiHttpRequestResponse;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.recommender.rules.io.params.RecommenderConstants;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags = {Constants.QUERY_TAG}, authorizations = {@Authorization("oauth")})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {@ApiKeyAuthDefinition(name = "Authorization", in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = "oauth")}))
@Path("/")
/* loaded from: input_file:org/apache/pinot/controller/api/resources/PinotRunningQueryResource.class */
public class PinotRunningQueryResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(PinotRunningQueryResource.class);

    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;

    @Inject
    ControllerConf _controllerConf;

    @Inject
    private Executor _executor;

    @Inject
    private HttpClientConnectionManager _httpConnMgr;

    @ApiResponses({@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = RecommenderConstants.SegmentSizeRule.DEFAULT_DESIRED_SEGMENT_SIZE_MB, message = "Internal server error"), @ApiResponse(code = 404, message = "Query not found on the requested broker")})
    @Path("query/{brokerId}/{queryId}")
    @DELETE
    @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the given queryId on the requested broker. Query may continue to run for a short while after calling cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "CancelQuery")
    public String cancelQuery(@PathParam("brokerId") @ApiParam(value = "Broker that's running the query", required = true) String str, @PathParam("queryId") @ApiParam(value = "QueryId as assigned by the broker", required = true) long j, @QueryParam("timeoutMs") @ApiParam("Timeout for servers to respond the cancel request") @DefaultValue("3000") int i, @QueryParam("verbose") @ApiParam("Return verbose responses for troubleshooting") @DefaultValue("false") boolean z, @Context HttpHeaders httpHeaders) {
        InstanceConfig helixInstanceConfig = this._pinotHelixResourceManager.getHelixInstanceConfig(str);
        if (helixInstanceConfig == null) {
            throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST).entity("Unknown broker: " + str).build());
        }
        try {
            CloseableHttpClient build = HttpClients.custom().setConnectionManager(this._httpConnMgr).setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(i).setSocketTimeout(i).build()).build();
            String controllerBrokerProtocol = this._controllerConf.getControllerBrokerProtocol();
            int controllerBrokerPortOverride = this._controllerConf.getControllerBrokerPortOverride();
            HttpDelete httpDelete = new HttpDelete(String.format("%s://%s:%d/query/%d?verbose=%b", controllerBrokerProtocol, helixInstanceConfig.getHostName(), Integer.valueOf(controllerBrokerPortOverride > 0 ? controllerBrokerPortOverride : Integer.parseInt(helixInstanceConfig.getPort())), Long.valueOf(j), Boolean.valueOf(z)));
            try {
                Map<String, String> createRequestHeaders = createRequestHeaders(httpHeaders);
                Objects.requireNonNull(httpDelete);
                createRequestHeaders.forEach(httpDelete::setHeader);
                CloseableHttpResponse execute = build.execute(httpDelete);
                int statusCode = execute.getStatusLine().getStatusCode();
                String entityUtils = EntityUtils.toString(execute.getEntity());
                if (statusCode == 200) {
                    return entityUtils;
                }
                if (statusCode == 404) {
                    throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker: %s", Long.valueOf(j), str)).build());
                }
                throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(String.format("Failed to cancel query: %s on the broker: %s with unexpected status=%d and resp='%s'", Long.valueOf(j), str, Integer.valueOf(statusCode), entityUtils)).build());
            } finally {
                httpDelete.releaseConnection();
            }
        } catch (Exception e) {
            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(String.format("Failed to cancel query: %s on the broker: %s due to error: %s", Long.valueOf(j), str, e.getMessage())).build());
        } catch (WebApplicationException e2) {
            throw e2;
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = RecommenderConstants.SegmentSizeRule.DEFAULT_DESIRED_SEGMENT_SIZE_MB, message = "Internal server error")})
    @Path("/queries")
    @ApiOperation(value = "Get running queries from all brokers", notes = "The queries are returned with brokers running them")
    @Produces({"application/json"})
    @Authorize(targetType = TargetType.CLUSTER, action = "GetRunningQuery")
    public Map<String, Map<String, String>> getRunningQueries(@QueryParam("timeoutMs") @ApiParam("Timeout for brokers to return running queries") @DefaultValue("3000") int i, @Context HttpHeaders httpHeaders) {
        try {
            Map<String, List<InstanceInfo>> tableToLiveBrokersMapping = this._pinotHelixResourceManager.getTableToLiveBrokersMapping();
            HashMap hashMap = new HashMap();
            tableToLiveBrokersMapping.values().forEach(list -> {
                list.forEach(instanceInfo -> {
                    hashMap.putIfAbsent(getInstanceKey(instanceInfo), instanceInfo);
                });
            });
            return getRunningQueries(hashMap, i, createRequestHeaders(httpHeaders));
        } catch (Exception e) {
            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("Failed to get running queries due to error: " + e.getMessage()).build());
        }
    }

    private Map<String, Map<String, String>> getRunningQueries(Map<String, InstanceInfo> map, int i, Map<String, String> map2) throws Exception {
        URI uri;
        int statusCode;
        String entityUtils;
        String controllerBrokerProtocol = this._controllerConf.getControllerBrokerProtocol();
        int controllerBrokerPortOverride = this._controllerConf.getControllerBrokerPortOverride();
        ArrayList arrayList = new ArrayList();
        for (InstanceInfo instanceInfo : map.values()) {
            arrayList.add(String.format("%s://%s:%d/queries", controllerBrokerProtocol, instanceInfo.getHost(), Integer.valueOf(controllerBrokerPortOverride > 0 ? controllerBrokerPortOverride : instanceInfo.getPort().intValue())));
        }
        LOGGER.debug("Getting running queries via broker urls: {}", arrayList);
        CompletionService executeGet = new MultiHttpRequest(this._executor, this._httpConnMgr).executeGet(arrayList, map2, i);
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            MultiHttpRequestResponse multiHttpRequestResponse = null;
            try {
                try {
                    multiHttpRequestResponse = (MultiHttpRequestResponse) executeGet.take().get();
                    uri = multiHttpRequestResponse.getURI();
                    statusCode = multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode();
                    entityUtils = EntityUtils.toString(multiHttpRequestResponse.getResponse().getEntity());
                } catch (Exception e) {
                    LOGGER.error("Failed to get queries", e);
                    arrayList2.add(e.getMessage());
                    if (multiHttpRequestResponse != null) {
                        multiHttpRequestResponse.close();
                    }
                }
                if (statusCode != 200) {
                    throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", Integer.valueOf(statusCode), entityUtils, uri));
                    break;
                }
                hashMap.put(map.get(getInstanceKey(uri)).getInstanceName(), (Map) JsonUtils.stringToObject(entityUtils, Map.class));
                if (multiHttpRequestResponse != null) {
                    multiHttpRequestResponse.close();
                }
            } catch (Throwable th) {
                if (multiHttpRequestResponse != null) {
                    multiHttpRequestResponse.close();
                }
                throw th;
            }
        }
        if (arrayList2.size() > 0) {
            throw new Exception("Unexpected responses from brokers: " + StringUtils.join(arrayList2, ","));
        }
        return hashMap;
    }

    private static String getInstanceKey(InstanceInfo instanceInfo) {
        return instanceInfo.getHost() + ":" + instanceInfo.getPort();
    }

    private static String getInstanceKey(URI uri) throws Exception {
        return uri.getHost() + ":" + uri.getPort();
    }

    private static Map<String, String> createRequestHeaders(HttpHeaders httpHeaders) {
        HashMap hashMap = new HashMap();
        httpHeaders.getRequestHeaders().keySet().forEach(str -> {
            hashMap.put(str, httpHeaders.getHeaderString(str));
        });
        return hashMap;
    }
}
