package org.apache.pinot.query.service.dispatch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.serde.PlanNodeDeserializer;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.service.dispatch.timeseries.AsyncQueryTimeSeriesDispatchResponse;
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesDispatchablePlan;
import org.apache.pinot.tsdb.planner.physical.TimeSeriesQueryServerInstance;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher.class */
public class QueryDispatcher {
    private static final Logger LOGGER;
    private static final String PINOT_BROKER_QUERY_DISPATCHER_FORMAT = "multistage-query-dispatch-%d";
    private static final ObjectMapper OBJECT_MAPPER;
    private final MailboxService _mailboxService;
    private final ExecutorService _executorService;
    private final Map<String, DispatchClient> _dispatchClientMap;
    private final Map<String, TimeSeriesDispatchClient> _timeSeriesDispatchClientMap;

    @Nullable
    private final TlsConfig _tlsConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher$QueryResult.class */
    public static class QueryResult {
        private final ResultTable _resultTable;
        private final List<MultiStageQueryStats.StageStats.Closed> _queryStats;
        private final long _brokerReduceTimeMs;

        public QueryResult(ResultTable resultTable, MultiStageQueryStats multiStageQueryStats, long j) {
            this._resultTable = resultTable;
            Preconditions.checkArgument(multiStageQueryStats.getCurrentStageId() == 0, "Expecting query stats for stage 0, got: %s", multiStageQueryStats.getCurrentStageId());
            int maxStageId = multiStageQueryStats.getMaxStageId() + 1;
            this._queryStats = new ArrayList(maxStageId);
            this._queryStats.add(multiStageQueryStats.getCurrentStats().close());
            for (int i = 1; i < maxStageId; i++) {
                this._queryStats.add(multiStageQueryStats.getUpstreamStageStats(i));
            }
            this._brokerReduceTimeMs = j;
        }

        public ResultTable getResultTable() {
            return this._resultTable;
        }

        public List<MultiStageQueryStats.StageStats.Closed> getQueryStats() {
            return this._queryStats;
        }

        public long getBrokerReduceTimeMs() {
            return this._brokerReduceTimeMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher$SendRequest.class */
    public interface SendRequest<E> {
        void send(DispatchClient dispatchClient, Worker.QueryRequest queryRequest, QueryServerInstance queryServerInstance, Deadline deadline, Consumer<AsyncResponse<E>> consumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher$StageInfo.class */
    public static class StageInfo {
        final ByteString _rootNode;
        final ByteString _customProperty;

        private StageInfo(ByteString byteString, ByteString byteString2) {
            this._rootNode = byteString;
            this._customProperty = byteString2;
        }
    }

    public QueryDispatcher(MailboxService mailboxService) {
        this(mailboxService, null);
    }

    public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig) {
        this._dispatchClientMap = new ConcurrentHashMap();
        this._timeSeriesDispatchClientMap = new ConcurrentHashMap();
        this._mailboxService = mailboxService;
        this._executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(5, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT));
        this._tlsConfig = tlsConfig;
    }

    public void start() {
        this._mailboxService.start();
    }

    public QueryResult submitAndReduce(RequestContext requestContext, DispatchableSubPlan dispatchableSubPlan, long j, Map<String, String> map) throws Exception {
        long requestId = requestContext.getRequestId();
        List<DispatchablePlanFragment> queryStageList = dispatchableSubPlan.getQueryStageList();
        try {
            submit(requestId, dispatchableSubPlan, j, map);
            return runReducer(requestId, dispatchableSubPlan, j, map, this._mailboxService);
        } catch (Throwable th) {
            cancel(requestId, queryStageList);
            throw th;
        }
    }

    public List<PlanNode> explain(RequestContext requestContext, DispatchablePlanFragment dispatchablePlanFragment, long j, Map<String, String> map) throws TimeoutException, InterruptedException, ExecutionException {
        long requestId = requestContext.getRequestId();
        ArrayList arrayList = new ArrayList();
        List<DispatchablePlanFragment> singletonList = Collections.singletonList(dispatchablePlanFragment);
        try {
            execute(requestId, singletonList, j, map, (v0, v1, v2, v3, v4) -> {
                v0.explain(v1, v2, v3, v4);
            }, (list, queryServerInstance) -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    Worker.ExplainResponse explainResponse = (Worker.ExplainResponse) it.next();
                    if (explainResponse.containsMetadata("ERROR")) {
                        throw new RuntimeException(String.format("Unable to explain query plan for request: %d on server: %s, ERROR: %s", Long.valueOf(requestId), queryServerInstance, explainResponse.getMetadataOrDefault("ERROR", "null")));
                    }
                    Iterator it2 = explainResponse.getStagePlanList().iterator();
                    while (it2.hasNext()) {
                        try {
                            arrayList.add(PlanNodeDeserializer.process(Plan.PlanNode.parseFrom(((Worker.StagePlan) it2.next()).getRootNode())));
                        } catch (InvalidProtocolBufferException e) {
                            String.valueOf(queryServerInstance);
                            RuntimeException runtimeException = new RuntimeException("Failed to parse explain plan node for request " + requestId + " from server " + runtimeException, e);
                            throw runtimeException;
                        }
                    }
                }
            });
            return arrayList;
        } catch (Throwable th) {
            cancel(requestId, singletonList);
            throw th;
        }
    }

    public PinotBrokerTimeSeriesResponse submitAndGet(RequestContext requestContext, TimeSeriesDispatchablePlan timeSeriesDispatchablePlan, long j, Map<String, String> map) {
        long requestId = requestContext.getRequestId();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        try {
            Objects.requireNonNull(arrayBlockingQueue);
            submit(requestId, timeSeriesDispatchablePlan, j, map, requestContext, (v1) -> {
                r6.offer(v1);
            });
            AsyncQueryTimeSeriesDispatchResponse asyncQueryTimeSeriesDispatchResponse = (AsyncQueryTimeSeriesDispatchResponse) arrayBlockingQueue.poll(j, TimeUnit.MILLISECONDS);
            if (asyncQueryTimeSeriesDispatchResponse == null) {
                return PinotBrokerTimeSeriesResponse.newErrorResponse("TimeoutException", "Timed out waiting for response");
            }
            if (asyncQueryTimeSeriesDispatchResponse.getThrowable() != null) {
                Throwable throwable = asyncQueryTimeSeriesDispatchResponse.getThrowable();
                return PinotBrokerTimeSeriesResponse.newErrorResponse(throwable.getClass().getSimpleName(), throwable.getMessage());
            }
            if (asyncQueryTimeSeriesDispatchResponse.getQueryResponse() == null) {
                return PinotBrokerTimeSeriesResponse.newErrorResponse("NullResponse", "Received null response from server");
            }
            if (asyncQueryTimeSeriesDispatchResponse.getQueryResponse().containsMetadata("error")) {
                return PinotBrokerTimeSeriesResponse.newErrorResponse(asyncQueryTimeSeriesDispatchResponse.getQueryResponse().getMetadataOrDefault("errorType", "unknown error-type"), asyncQueryTimeSeriesDispatchResponse.getQueryResponse().getMetadataOrDefault("error", "unknown error"));
            }
            Worker.TimeSeriesResponse queryResponse = asyncQueryTimeSeriesDispatchResponse.getQueryResponse();
            Preconditions.checkNotNull(queryResponse, "time series response is null");
            return (PinotBrokerTimeSeriesResponse) OBJECT_MAPPER.readValue(queryResponse.getPayload().toStringUtf8(), PinotBrokerTimeSeriesResponse.class);
        } catch (Throwable th) {
            return PinotBrokerTimeSeriesResponse.newErrorResponse(th.getClass().getSimpleName(), th.getMessage());
        }
    }

    @VisibleForTesting
    void submit(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map) throws Exception {
        SendRequest sendRequest = (v0, v1, v2, v3, v4) -> {
            v0.submit(v1, v2, v3, v4);
        };
        List queryStageList = dispatchableSubPlan.getQueryStageList();
        execute(j, queryStageList.subList(1, queryStageList.size()), j2, map, sendRequest, (queryResponse, queryServerInstance) -> {
            if (queryResponse.containsMetadata("ERROR")) {
                throw new RuntimeException(String.format("Unable to execute query plan for request: %d on server: %s, ERROR: %s", Long.valueOf(j), queryServerInstance, queryResponse.getMetadataOrDefault("ERROR", "null")));
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <E> void execute(long j, List<DispatchablePlanFragment> list, long j2, Map<String, String> map, SendRequest<E> sendRequest, BiConsumer<E, QueryServerInstance> biConsumer) throws ExecutionException, InterruptedException, TimeoutException {
        Deadline after = Deadline.after(j2, TimeUnit.MILLISECONDS);
        HashSet hashSet = new HashSet();
        List<StageInfo> serializePlanFragments = serializePlanFragments(list, hashSet, after);
        if (hashSet.isEmpty()) {
            throw new RuntimeException("No server instances to dispatch query to");
        }
        ByteString protoProperties = QueryPlanSerDeUtils.toProtoProperties(prepareRequestMetadata(j, map, after));
        int size = hashSet.size();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(size);
        for (QueryServerInstance queryServerInstance : hashSet) {
            Consumer<AsyncResponse<E>> consumer = asyncResponse -> {
                if (arrayBlockingQueue.offer(asyncResponse)) {
                    return;
                }
                LOGGER.warn("Failed to offer response to dispatchCallbacks queue for query: {} on server: {}", Long.valueOf(j), queryServerInstance);
            };
            try {
                sendRequest.send(getOrCreateDispatchClient(queryServerInstance), createRequest(queryServerInstance, list, serializePlanFragments, protoProperties), queryServerInstance, after, consumer);
            } catch (Throwable th) {
                LOGGER.warn("Caught exception while dispatching query: {} to server: {}", new Object[]{Long.valueOf(j), queryServerInstance, th});
                consumer.accept(new AsyncResponse<>(queryServerInstance, null, th));
            }
        }
        int i = 0;
        while (!after.isExpired() && i < size) {
            AsyncResponse asyncResponse2 = (AsyncResponse) arrayBlockingQueue.poll(after.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            if (asyncResponse2 != null) {
                if (asyncResponse2.getThrowable() != null) {
                    throw new RuntimeException(String.format("Error dispatching query: %d to server: %s", Long.valueOf(j), asyncResponse2.getServerInstance()), asyncResponse2.getThrowable());
                }
                Object response = asyncResponse2.getResponse();
                if (!$assertionsDisabled && response == null) {
                    throw new AssertionError();
                }
                biConsumer.accept(response, asyncResponse2.getServerInstance());
                i++;
            }
        }
        if (after.isExpired()) {
            throw new TimeoutException("Timed out waiting for response of async query-dispatch");
        }
    }

    void submit(long j, TimeSeriesDispatchablePlan timeSeriesDispatchablePlan, long j2, Map<String, String> map, RequestContext requestContext, Consumer<AsyncQueryTimeSeriesDispatchResponse> consumer) throws Exception {
        Deadline after = Deadline.after(j2, TimeUnit.MILLISECONDS);
        long currentTimeMillis = System.currentTimeMillis() + j2;
        Worker.TimeSeriesQueryRequest build = Worker.TimeSeriesQueryRequest.newBuilder().addDispatchPlan(timeSeriesDispatchablePlan.getSerializedPlan()).putAllMetadata(initializeTimeSeriesMetadataMap(timeSeriesDispatchablePlan, currentTimeMillis, requestContext)).putMetadata("requestId", Long.toString(j)).build();
        TimeSeriesDispatchClient orCreateTimeSeriesDispatchClient = getOrCreateTimeSeriesDispatchClient(timeSeriesDispatchablePlan.getQueryServerInstance());
        QueryServerInstance queryServerInstance = new QueryServerInstance(timeSeriesDispatchablePlan.getQueryServerInstance().getHostname(), timeSeriesDispatchablePlan.getQueryServerInstance().getQueryServicePort(), timeSeriesDispatchablePlan.getQueryServerInstance().getQueryMailboxPort());
        Objects.requireNonNull(consumer);
        orCreateTimeSeriesDispatchClient.submit(build, queryServerInstance, after, (v1) -> {
            r4.accept(v1);
        });
    }

    Map<String, String> initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan timeSeriesDispatchablePlan, long j, RequestContext requestContext) {
        HashMap hashMap = new HashMap();
        TimeBuckets timeBuckets = timeSeriesDispatchablePlan.getTimeBuckets();
        hashMap.put("language", timeSeriesDispatchablePlan.getLanguage());
        hashMap.put("startTimeSeconds", Long.toString(timeBuckets.getTimeBuckets()[0].longValue()));
        hashMap.put("windowSeconds", Long.toString(timeBuckets.getBucketSize().getSeconds()));
        hashMap.put("numElements", Long.toString(timeBuckets.getTimeBuckets().length));
        hashMap.put("deadlineMs", Long.toString(j));
        for (Map.Entry entry : timeSeriesDispatchablePlan.getPlanIdToSegments().entrySet()) {
            hashMap.put(TimeSeriesPlanConstants.WorkerRequestMetadataKeys.encodeSegmentListKey((String) entry.getKey()), String.join(",", (Iterable<? extends CharSequence>) entry.getValue()));
        }
        hashMap.put("requestId", Long.toString(requestContext.getRequestId()));
        hashMap.put("brokerId", requestContext.getBrokerId());
        return hashMap;
    }

    private static Worker.QueryRequest createRequest(QueryServerInstance queryServerInstance, List<DispatchablePlanFragment> list, List<StageInfo> list2, ByteString byteString) {
        Worker.QueryRequest.Builder newBuilder = Worker.QueryRequest.newBuilder();
        newBuilder.setVersion(1);
        for (int i = 0; i < list.size(); i++) {
            DispatchablePlanFragment dispatchablePlanFragment = list.get(i);
            List list3 = (List) dispatchablePlanFragment.getServerInstanceToWorkerIdMap().get(queryServerInstance);
            if (list3 != null) {
                List workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
                ArrayList arrayList = new ArrayList(list3.size());
                Iterator it = list3.iterator();
                while (it.hasNext()) {
                    arrayList.add((WorkerMetadata) workerMetadataList.get(((Integer) it.next()).intValue()));
                }
                List protoWorkerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(arrayList);
                StageInfo stageInfo = list2.get(i);
                newBuilder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(stageInfo._rootNode).setStageMetadata(Worker.StageMetadata.newBuilder().setStageId(i + 1).addAllWorkerMetadata(protoWorkerMetadataList).setCustomProperty(stageInfo._customProperty).build()).build());
            }
        }
        newBuilder.setMetadata(byteString);
        return newBuilder.build();
    }

    private static Map<String, String> prepareRequestMetadata(long j, Map<String, String> map, Deadline deadline) {
        HashMap hashMap = new HashMap();
        hashMap.put("requestId", Long.toString(j));
        hashMap.put("timeoutMs", Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
        hashMap.putAll(map);
        return hashMap;
    }

    private List<StageInfo> serializePlanFragments(List<DispatchablePlanFragment> list, Set<QueryServerInstance> set, Deadline deadline) throws InterruptedException, ExecutionException, TimeoutException {
        ArrayList<CompletableFuture> arrayList = new ArrayList(list.size());
        for (DispatchablePlanFragment dispatchablePlanFragment : list) {
            set.addAll(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().keySet());
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                return serializePlanFragment(dispatchablePlanFragment);
            }, this._executorService));
        }
        ArrayList arrayList2 = new ArrayList(list.size());
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add((StageInfo) ((CompletableFuture) it.next()).get(deadline.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
            }
            return arrayList2;
        } finally {
            for (CompletableFuture completableFuture : arrayList) {
                if (!completableFuture.isDone()) {
                    completableFuture.cancel(true);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StageInfo serializePlanFragment(DispatchablePlanFragment dispatchablePlanFragment) {
        return new StageInfo(PlanNodeSerializer.process(dispatchablePlanFragment.getPlanFragment().getFragmentRoot()).toByteString(), QueryPlanSerDeUtils.toProtoProperties(dispatchablePlanFragment.getCustomProperties()));
    }

    private void cancel(long j, List<DispatchablePlanFragment> list) {
        int size = list.size();
        HashSet<QueryServerInstance> hashSet = new HashSet();
        for (int i = 1; i < size; i++) {
            hashSet.addAll(list.get(i).getServerInstanceToWorkerIdMap().keySet());
        }
        for (QueryServerInstance queryServerInstance : hashSet) {
            try {
                getOrCreateDispatchClient(queryServerInstance).cancel(j);
            } catch (Throwable th) {
                LOGGER.warn("Caught exception while cancelling query: {} on server: {}", new Object[]{Long.valueOf(j), queryServerInstance, th});
            }
        }
    }

    private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) {
        String hostname = queryServerInstance.getHostname();
        int queryServicePort = queryServerInstance.getQueryServicePort();
        return this._dispatchClientMap.computeIfAbsent(String.format("%s_%d", hostname, Integer.valueOf(queryServicePort)), str -> {
            return new DispatchClient(hostname, queryServicePort, this._tlsConfig);
        });
    }

    private TimeSeriesDispatchClient getOrCreateTimeSeriesDispatchClient(TimeSeriesQueryServerInstance timeSeriesQueryServerInstance) {
        String hostname = timeSeriesQueryServerInstance.getHostname();
        int queryServicePort = timeSeriesQueryServerInstance.getQueryServicePort();
        return this._timeSeriesDispatchClientMap.computeIfAbsent(String.format("%s_%d", hostname, Integer.valueOf(queryServicePort)), str -> {
            return new TimeSeriesDispatchClient(hostname, queryServicePort);
        });
    }

    @VisibleForTesting
    public static QueryResult runReducer(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map, MailboxService mailboxService) {
        long currentTimeMillis = System.currentTimeMillis();
        long j3 = currentTimeMillis + j2;
        DispatchablePlanFragment dispatchablePlanFragment = (DispatchablePlanFragment) dispatchableSubPlan.getQueryStageList().get(0);
        MailboxReceiveNode fragmentRoot = dispatchablePlanFragment.getPlanFragment().getFragmentRoot();
        Preconditions.checkState(fragmentRoot instanceof MailboxReceiveNode, "Expecting mailbox receive node as root of reduce stage, got: %s", fragmentRoot.getClass().getSimpleName());
        MailboxReceiveNode mailboxReceiveNode = fragmentRoot;
        List workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
        Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s", workerMetadataList.size());
        OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, j, j3, map, new StageMetadata(0, workerMetadataList, dispatchablePlanFragment.getCustomProperties()), (WorkerMetadata) workerMetadataList.get(0), null, Tracing.getThreadAccountant().getThreadExecutionContext());
        PairList queryResultFields = dispatchableSubPlan.getQueryResultFields();
        DataSchema dataSchema = mailboxReceiveNode.getDataSchema();
        int size = queryResultFields.size();
        String[] strArr = new String[size];
        DataSchema.ColumnDataType[] columnDataTypeArr = new DataSchema.ColumnDataType[size];
        for (int i = 0; i < size; i++) {
            Map.Entry entry = (Map.Entry) queryResultFields.get(i);
            strArr[i] = (String) entry.getValue();
            columnDataTypeArr[i] = dataSchema.getColumnDataType(((Integer) entry.getKey()).intValue());
        }
        DataSchema dataSchema2 = new DataSchema(strArr, columnDataTypeArr);
        ArrayList arrayList = new ArrayList();
        MailboxReceiveOperator mailboxReceiveOperator = new MailboxReceiveOperator(opChainExecutionContext, mailboxReceiveNode);
        try {
            TransferableBlock nextBlock = mailboxReceiveOperator.m41nextBlock();
            while (!TransferableBlockUtils.isEndOfStream(nextBlock)) {
                DataBlock dataBlock = nextBlock.getDataBlock();
                int numberOfRows = dataBlock.getNumberOfRows();
                if (numberOfRows > 0) {
                    arrayList.ensureCapacity(arrayList.size() + numberOfRows);
                    for (Object[] objArr : DataBlockExtractUtils.extractRows(dataBlock)) {
                        Object[] objArr2 = new Object[size];
                        for (int i2 = 0; i2 < size; i2++) {
                            Object obj = objArr[((Integer) ((Map.Entry) queryResultFields.get(i2)).getKey()).intValue()];
                            if (obj != null) {
                                DataSchema.ColumnDataType columnDataType = columnDataTypeArr[i2];
                                objArr2[i2] = columnDataType.format(columnDataType.toExternal(obj));
                            }
                        }
                        arrayList.add(objArr2);
                    }
                }
                nextBlock = mailboxReceiveOperator.m41nextBlock();
            }
            mailboxReceiveOperator.close();
            if (nextBlock.isErrorBlock()) {
                throw new RuntimeException("Received error query execution result block: " + String.valueOf(nextBlock.getExceptions()));
            }
            if (!$assertionsDisabled && !nextBlock.isSuccessfulEndOfStreamBlock()) {
                throw new AssertionError();
            }
            MultiStageQueryStats queryStats = nextBlock.getQueryStats();
            if ($assertionsDisabled || queryStats != null) {
                return new QueryResult(new ResultTable(dataSchema2, arrayList), queryStats, System.currentTimeMillis() - currentTimeMillis);
            }
            throw new AssertionError();
        } catch (Throwable th) {
            try {
                mailboxReceiveOperator.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void shutdown() {
        Iterator<DispatchClient> it = this._dispatchClientMap.values().iterator();
        while (it.hasNext()) {
            it.next().getChannel().shutdown();
        }
        this._dispatchClientMap.clear();
        this._mailboxService.shutdown();
        this._executorService.shutdown();
    }

    static {
        $assertionsDisabled = !QueryDispatcher.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(QueryDispatcher.class);
        OBJECT_MAPPER = new ObjectMapper();
    }
}
