package org.apache.pinot.broker.requesthandler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.failuredetector.FailureDetector;
import org.apache.pinot.broker.failuredetector.FailureDetectorFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.transport.QueryResponse;
import org.apache.pinot.core.transport.QueryRouter;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.class */
public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandler implements FailureDetector.Listener {
    private static final Logger LOGGER;
    private final BrokerReduceService _brokerReduceService;
    private final QueryRouter _queryRouter;
    private final FailureDetector _failureDetector;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SingleConnectionBrokerRequestHandler(PinotConfiguration pinotConfiguration, BrokerRoutingManager brokerRoutingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig) {
        super(pinotConfiguration, brokerRoutingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
        LOGGER.info("Using Netty BrokerRequestHandler.");
        this._brokerReduceService = new BrokerReduceService(this._config);
        this._queryRouter = new QueryRouter(this._brokerId, brokerMetrics, nettyConfig, tlsConfig);
        this._failureDetector = FailureDetectorFactory.getFailureDetector(pinotConfiguration, brokerMetrics);
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public void start() {
        this._failureDetector.register(this);
        this._failureDetector.start();
    }

    @Override // org.apache.pinot.broker.requesthandler.BrokerRequestHandler
    public synchronized void shutDown() {
        this._failureDetector.stop();
        this._queryRouter.shutDown();
        this._brokerReduceService.shutDown();
    }

    @Override // org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler
    protected BrokerResponseNative processBrokerRequest(long j, BrokerRequest brokerRequest, BrokerRequest brokerRequest2, @Nullable BrokerRequest brokerRequest3, @Nullable Map<ServerInstance, List<String>> map, @Nullable BrokerRequest brokerRequest4, @Nullable Map<ServerInstance, List<String>> map2, long j2, BaseBrokerRequestHandler.ServerStats serverStats, RequestContext requestContext) throws Exception {
        if (!$assertionsDisabled && brokerRequest3 == null && brokerRequest4 == null) {
            throw new AssertionError();
        }
        if (requestContext.isSampledRequest()) {
            brokerRequest2.getPinotQuery().putToQueryOptions("trace", "true");
        }
        String extractRawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
        long nanoTime = System.nanoTime();
        QueryResponse submitQuery = this._queryRouter.submitQuery(j, extractRawTableName, brokerRequest3, map, brokerRequest4, map2, j2);
        this._failureDetector.notifyQuerySubmitted(submitQuery);
        Map finalResponses = submitQuery.getFinalResponses();
        this._failureDetector.notifyQueryFinished(submitQuery);
        this._brokerMetrics.addPhaseTiming(extractRawTableName, BrokerQueryPhase.SCATTER_GATHER, System.nanoTime() - nanoTime);
        serverStats.setServerStats(submitQuery.getServerStats());
        int size = finalResponses.size();
        long j3 = 0;
        HashMap hashMap = new HashMap(HashUtil.getHashMapCapacity(size));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : finalResponses.entrySet()) {
            DataTable dataTable = ((ServerResponse) entry.getValue()).getDataTable();
            if (dataTable != null) {
                hashMap.put((ServerRoutingInstance) entry.getKey(), dataTable);
                j3 += r0.getResponseSize();
            } else {
                arrayList.add((ServerRoutingInstance) entry.getKey());
            }
        }
        int size2 = hashMap.size();
        long nanoTime2 = System.nanoTime();
        BrokerResponseNative reduceOnDataTable = this._brokerReduceService.reduceOnDataTable(brokerRequest, brokerRequest2, hashMap, j2 - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime), this._brokerMetrics);
        long nanoTime3 = System.nanoTime() - nanoTime2;
        requestContext.setReduceTimeNanos(nanoTime3);
        this._brokerMetrics.addPhaseTiming(extractRawTableName, BrokerQueryPhase.REDUCE, nanoTime3);
        reduceOnDataTable.setNumServersQueried(size);
        reduceOnDataTable.setNumServersResponded(size2);
        Exception exception = submitQuery.getException();
        if (exception != null) {
            reduceOnDataTable.addToExceptions(new QueryProcessingException(425, QueryException.getTruncatedStackTrace(exception)));
        }
        int size3 = arrayList.size();
        if (size3 != 0) {
            reduceOnDataTable.addToExceptions(new QueryProcessingException(427, String.format("%d servers %s not responded", Integer.valueOf(size3), arrayList)));
            this._brokerMetrics.addMeteredTableValue(extractRawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED, 1L);
        }
        if (reduceOnDataTable.getExceptionsSize() > 0) {
            this._brokerMetrics.addMeteredTableValue(extractRawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1L);
        }
        this._brokerMetrics.addMeteredTableValue(extractRawTableName, BrokerMeter.TOTAL_SERVER_RESPONSE_SIZE, j3);
        return reduceOnDataTable;
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector.Listener
    public void notifyUnhealthyServer(String str, FailureDetector failureDetector) {
        this._routingManager.excludeServerFromRouting(str);
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector.Listener
    public void retryUnhealthyServer(String str, FailureDetector failureDetector) {
        LOGGER.info("Retrying unhealthy server: {}", str);
        ServerInstance serverInstance = this._routingManager.getEnabledServerInstanceMap().get(str);
        if (serverInstance == null) {
            LOGGER.info("Failed to find enabled server: {} in routing manager, skipping the retry", str);
        } else if (!this._queryRouter.connect(serverInstance)) {
            LOGGER.warn("Still cannot connect to server: {}, retry later", str);
        } else {
            LOGGER.info("Successfully connect to server: {}, marking it healthy", str);
            failureDetector.markServerHealthy(str);
        }
    }

    @Override // org.apache.pinot.broker.failuredetector.FailureDetector.Listener
    public void notifyHealthyServer(String str, FailureDetector failureDetector) {
        this._routingManager.includeServerToRouting(str);
    }

    static {
        $assertionsDisabled = !SingleConnectionBrokerRequestHandler.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(SingleConnectionBrokerRequestHandler.class);
    }
}
