package org.apache.pinot.broker.requesthandler;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.core.query.reduce.StreamingReduceService;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.trace.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.class */
public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
    private static final Logger LOGGER;
    private final GrpcConfig _grpcConfig;
    private final StreamingReduceService _streamingReduceService;
    private final PinotStreamingQueryClient _streamingQueryClient;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler$PinotStreamingQueryClient.class */
    public static class PinotStreamingQueryClient {
        private final Map<String, GrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap();
        private final GrpcConfig _config;

        public PinotStreamingQueryClient(GrpcConfig grpcConfig) {
            this._config = grpcConfig;
        }

        public Iterator<Server.ServerResponse> submit(String str, int i, Server.ServerRequest serverRequest) {
            return getOrCreateGrpcQueryClient(str, i).submit(serverRequest);
        }

        private GrpcQueryClient getOrCreateGrpcQueryClient(String str, int i) {
            return this._grpcQueryClientMap.computeIfAbsent(String.format("%s_%d", str, Integer.valueOf(i)), str2 -> {
                return new GrpcQueryClient(str, i, this._config);
            });
        }

        public void shutdown() {
            Iterator<GrpcQueryClient> it = this._grpcQueryClientMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        }
    }

    public GrpcBrokerRequestHandler(PinotConfiguration pinotConfiguration, String str, BrokerRoutingManager brokerRoutingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, TlsConfig tlsConfig, BrokerQueryEventListener brokerQueryEventListener) {
        super(pinotConfiguration, str, brokerRoutingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics, brokerQueryEventListener);
        LOGGER.info("Using Grpc BrokerRequestHandler.");
        this._grpcConfig = GrpcConfig.buildGrpcQueryConfig(pinotConfiguration);
        this._streamingQueryClient = new PinotStreamingQueryClient(this._grpcConfig);
        this._streamingReduceService = new StreamingReduceService(pinotConfiguration);
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void start() {
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public synchronized void shutDown() {
        this._streamingQueryClient.shutdown();
        this._streamingReduceService.shutDown();
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler
    protected BrokerResponseNative processBrokerRequest(long j, BrokerRequest brokerRequest, BrokerRequest brokerRequest2, @Nullable BrokerRequest brokerRequest3, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> map, @Nullable BrokerRequest brokerRequest4, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> map2, long j2, BaseBrokerRequestHandler.ServerStats serverStats, RequestContext requestContext) throws Exception {
        if (!$assertionsDisabled && brokerRequest3 == null && brokerRequest4 == null) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap();
        if (brokerRequest3 != null) {
            if (!$assertionsDisabled && map == null) {
                throw new AssertionError();
            }
            sendRequest(j, TableType.OFFLINE, brokerRequest3, map, hashMap, requestContext.isSampledRequest());
        }
        if (brokerRequest4 != null) {
            if (!$assertionsDisabled && map2 == null) {
                throw new AssertionError();
            }
            sendRequest(j, TableType.REALTIME, brokerRequest4, map2, hashMap, requestContext.isSampledRequest());
        }
        long nanoTime = System.nanoTime();
        BrokerResponseNative reduceOnStreamResponse = this._streamingReduceService.reduceOnStreamResponse(brokerRequest, hashMap, j2, this._brokerMetrics);
        requestContext.setReduceTimeNanos(System.nanoTime() - nanoTime);
        return reduceOnStreamResponse;
    }

    private void sendRequest(long j, TableType tableType, BrokerRequest brokerRequest, Map<ServerInstance, Pair<List<String>, List<String>>> map, Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> map2, boolean z) {
        for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : map.entrySet()) {
            ServerInstance key = entry.getKey();
            List list = (List) entry.getValue().getLeft();
            map2.put(key.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC), this._streamingQueryClient.submit(key.getHostname(), key.getGrpcPort(), new GrpcRequestBuilder().setRequestId(j).setBrokerId(this._brokerId).setEnableTrace(z).setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(list).build()));
        }
    }

    static {
        $assertionsDisabled = !GrpcBrokerRequestHandler.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
    }
}
