package org.apache.pinot.query.service.dispatch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.trace.RequestContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher.class */
public class QueryDispatcher {
    private static final Logger LOGGER;
    private static final String PINOT_BROKER_QUERY_DISPATCHER_FORMAT = "multistage-query-dispatch-%d";
    private final MailboxService _mailboxService;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap();
    private final ExecutorService _executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(5, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher$StageInfo.class */
    public static class StageInfo {
        final ByteString _rootNode;
        final ByteString _customProperty;

        private StageInfo(ByteString byteString, ByteString byteString2) {
            this._rootNode = byteString;
            this._customProperty = byteString2;
        }
    }

    public QueryDispatcher(MailboxService mailboxService) {
        this._mailboxService = mailboxService;
    }

    public ResultTable submitAndReduce(RequestContext requestContext, DispatchableSubPlan dispatchableSubPlan, long j, Map<String, String> map, @Nullable Map<Integer, ExecutionStatsAggregator> map2) throws Exception {
        long requestId = requestContext.getRequestId();
        try {
            submit(requestId, dispatchableSubPlan, j, map);
            long nanoTime = System.nanoTime();
            ResultTable runReducer = runReducer(requestId, dispatchableSubPlan, j, map, map2, this._mailboxService);
            requestContext.setReduceTimeNanos(System.nanoTime() - nanoTime);
            return runReducer;
        } catch (Throwable th) {
            cancel(requestId, dispatchableSubPlan);
            throw th;
        }
    }

    @VisibleForTesting
    void submit(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map) throws Exception {
        Deadline after = Deadline.after(j2, TimeUnit.MILLISECONDS);
        List queryStageList = dispatchableSubPlan.getQueryStageList();
        HashSet<QueryServerInstance> hashSet = new HashSet();
        int size = queryStageList.size() - 1;
        ArrayList<CompletableFuture> arrayList = new ArrayList(size);
        for (int i = 0; i < size; i++) {
            DispatchablePlanFragment dispatchablePlanFragment = (DispatchablePlanFragment) queryStageList.get(i + 1);
            hashSet.addAll(dispatchablePlanFragment.getServerInstanceToWorkerIdMap().keySet());
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                return new StageInfo(StageNodeSerDeUtils.serializeStageNode(dispatchablePlanFragment.getPlanFragment().getFragmentRoot()).toByteString(), QueryPlanSerDeUtils.toProtoProperties(dispatchablePlanFragment.getCustomProperties()));
            }, this._executorService));
        }
        ArrayList arrayList2 = new ArrayList(size);
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add((StageInfo) ((CompletableFuture) it.next()).get(after.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS));
            }
            HashMap hashMap = new HashMap();
            hashMap.put("requestId", Long.toString(j));
            hashMap.put("timeoutMs", Long.toString(after.timeRemaining(TimeUnit.MILLISECONDS)));
            hashMap.putAll(map);
            ByteString protoProperties = QueryPlanSerDeUtils.toProtoProperties(hashMap);
            int size2 = hashSet.size();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(size2);
            for (QueryServerInstance queryServerInstance : hashSet) {
                this._executorService.submit(() -> {
                    try {
                        Worker.QueryRequest.Builder newBuilder = Worker.QueryRequest.newBuilder();
                        for (int i2 = 0; i2 < size; i2++) {
                            int i3 = i2 + 1;
                            DispatchablePlanFragment dispatchablePlanFragment2 = (DispatchablePlanFragment) queryStageList.get(i3);
                            List list = (List) dispatchablePlanFragment2.getServerInstanceToWorkerIdMap().get(queryServerInstance);
                            if (list != null) {
                                List workerMetadataList = dispatchablePlanFragment2.getWorkerMetadataList();
                                ArrayList arrayList3 = new ArrayList(list.size());
                                Iterator it2 = list.iterator();
                                while (it2.hasNext()) {
                                    arrayList3.add((WorkerMetadata) workerMetadataList.get(((Integer) it2.next()).intValue()));
                                }
                                List protoWorkerMetadataList = QueryPlanSerDeUtils.toProtoWorkerMetadataList(arrayList3);
                                StageInfo stageInfo = (StageInfo) arrayList2.get(i2);
                                newBuilder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(stageInfo._rootNode).setStageMetadata(Worker.StageMetadata.newBuilder().setStageId(i3).addAllWorkerMetadata(protoWorkerMetadataList).setCustomProperty(stageInfo._customProperty).build()).build());
                            }
                        }
                        newBuilder.setMetadata(protoProperties);
                        DispatchClient orCreateDispatchClient = getOrCreateDispatchClient(queryServerInstance);
                        Worker.QueryRequest build = newBuilder.build();
                        Objects.requireNonNull(arrayBlockingQueue);
                        orCreateDispatchClient.submit(build, queryServerInstance, after, (v1) -> {
                            r4.offer(v1);
                        });
                    } catch (Throwable th) {
                        LOGGER.warn("Caught exception while dispatching query: {} to server: {}", new Object[]{Long.valueOf(j), queryServerInstance, th});
                        arrayBlockingQueue.offer(new AsyncQueryDispatchResponse(queryServerInstance, null, th));
                    }
                });
            }
            int i2 = 0;
            while (!after.isExpired() && i2 < size2) {
                AsyncQueryDispatchResponse asyncQueryDispatchResponse = (AsyncQueryDispatchResponse) arrayBlockingQueue.poll(after.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                if (asyncQueryDispatchResponse != null) {
                    if (asyncQueryDispatchResponse.getThrowable() != null) {
                        throw new RuntimeException(String.format("Error dispatching query: %d to server: %s", Long.valueOf(j), asyncQueryDispatchResponse.getServerInstance()), asyncQueryDispatchResponse.getThrowable());
                    }
                    Worker.QueryResponse queryResponse = asyncQueryDispatchResponse.getQueryResponse();
                    if (!$assertionsDisabled && queryResponse == null) {
                        throw new AssertionError();
                    }
                    if (queryResponse.containsMetadata("ERROR")) {
                        throw new RuntimeException(String.format("Unable to execute query plan for request: %d on server: %s, ERROR: %s", Long.valueOf(j), asyncQueryDispatchResponse.getServerInstance(), queryResponse.getMetadataOrDefault("ERROR", "null")));
                    }
                    i2++;
                }
            }
            if (after.isExpired()) {
                throw new TimeoutException("Timed out waiting for response of async query-dispatch");
            }
        } finally {
            for (CompletableFuture completableFuture : arrayList) {
                if (!completableFuture.isDone()) {
                    completableFuture.cancel(true);
                }
            }
        }
    }

    private void cancel(long j, DispatchableSubPlan dispatchableSubPlan) {
        List queryStageList = dispatchableSubPlan.getQueryStageList();
        int size = queryStageList.size();
        HashSet<QueryServerInstance> hashSet = new HashSet();
        for (int i = 1; i < size; i++) {
            hashSet.addAll(((DispatchablePlanFragment) queryStageList.get(i)).getServerInstanceToWorkerIdMap().keySet());
        }
        for (QueryServerInstance queryServerInstance : hashSet) {
            try {
                getOrCreateDispatchClient(queryServerInstance).cancel(j);
            } catch (Throwable th) {
                LOGGER.warn("Caught exception while cancelling query: {} on server: {}", new Object[]{Long.valueOf(j), queryServerInstance, th});
            }
        }
    }

    private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) {
        String hostname = queryServerInstance.getHostname();
        int queryServicePort = queryServerInstance.getQueryServicePort();
        return this._dispatchClientMap.computeIfAbsent(String.format("%s_%d", hostname, Integer.valueOf(queryServicePort)), str -> {
            return new DispatchClient(hostname, queryServicePort);
        });
    }

    @VisibleForTesting
    public static ResultTable runReducer(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map, @Nullable Map<Integer, ExecutionStatsAggregator> map2, MailboxService mailboxService) {
        DispatchablePlanFragment dispatchablePlanFragment = (DispatchablePlanFragment) dispatchableSubPlan.getQueryStageList().get(0);
        MailboxReceiveNode fragmentRoot = dispatchablePlanFragment.getPlanFragment().getFragmentRoot();
        Preconditions.checkState(fragmentRoot instanceof MailboxReceiveNode, "Expecting mailbox receive node as root of reduce stage, got: %s", fragmentRoot.getClass().getSimpleName());
        MailboxReceiveNode mailboxReceiveNode = fragmentRoot;
        List workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
        Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s", workerMetadataList.size());
        OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, j, System.currentTimeMillis() + j2, map, new StageMetadata(0, workerMetadataList, dispatchablePlanFragment.getCustomProperties()), (WorkerMetadata) workerMetadataList.get(0), null);
        ResultTable resultTable = getResultTable(new MailboxReceiveOperator(opChainExecutionContext, mailboxReceiveNode.getDistributionType(), mailboxReceiveNode.getSenderStageId()), mailboxReceiveNode.getDataSchema(), dispatchableSubPlan.getQueryResultFields());
        collectStats(dispatchableSubPlan, opChainExecutionContext.getStats(), map2);
        return resultTable;
    }

    private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats, @Nullable Map<Integer, ExecutionStatsAggregator> map) {
        if (MapUtils.isNotEmpty(map)) {
            for (OperatorStats operatorStats : opChainStats.getOperatorStatsMap().values()) {
                map.get(0).aggregate((ServerRoutingInstance) null, operatorStats.getExecutionStats(), new HashMap());
                ExecutionStatsAggregator executionStatsAggregator = map.get(Integer.valueOf(operatorStats.getStageId()));
                if (executionStatsAggregator != null) {
                    OperatorUtils.recordTableName(operatorStats, (DispatchablePlanFragment) dispatchableSubPlan.getQueryStageList().get(operatorStats.getStageId()));
                    executionStatsAggregator.aggregate((ServerRoutingInstance) null, operatorStats.getExecutionStats(), new HashMap());
                }
            }
        }
    }

    private static ResultTable getResultTable(MailboxReceiveOperator mailboxReceiveOperator, DataSchema dataSchema, List<Pair<Integer, String>> list) {
        TransferableBlock transferableBlock;
        int size = list.size();
        String[] strArr = new String[size];
        DataSchema.ColumnDataType[] columnDataTypeArr = new DataSchema.ColumnDataType[size];
        for (int i = 0; i < size; i++) {
            Pair<Integer, String> pair = list.get(i);
            strArr[i] = (String) pair.right;
            columnDataTypeArr[i] = dataSchema.getColumnDataType(((Integer) pair.left).intValue());
        }
        DataSchema dataSchema2 = new DataSchema(strArr, columnDataTypeArr);
        ArrayList arrayList = new ArrayList();
        TransferableBlock nextBlock = mailboxReceiveOperator.m22nextBlock();
        while (true) {
            transferableBlock = nextBlock;
            if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
                break;
            }
            DataBlock dataBlock = transferableBlock.getDataBlock();
            int numberOfRows = dataBlock.getNumberOfRows();
            if (numberOfRows > 0) {
                arrayList.ensureCapacity(arrayList.size() + numberOfRows);
                for (Object[] objArr : DataBlockExtractUtils.extractRows(dataBlock)) {
                    Object[] objArr2 = new Object[size];
                    for (int i2 = 0; i2 < size; i2++) {
                        Object obj = objArr[((Integer) list.get(i2).left).intValue()];
                        if (obj != null) {
                            DataSchema.ColumnDataType columnDataType = columnDataTypeArr[i2];
                            objArr2[i2] = columnDataType.format(columnDataType.toExternal(obj));
                        }
                    }
                    arrayList.add(objArr2);
                }
            }
            nextBlock = mailboxReceiveOperator.m22nextBlock();
        }
        if (transferableBlock.isErrorBlock()) {
            throw new RuntimeException("Received error query execution result block: " + transferableBlock.getExceptions());
        }
        return new ResultTable(dataSchema2, arrayList);
    }

    public void shutdown() {
        Iterator<DispatchClient> it = this._dispatchClientMap.values().iterator();
        while (it.hasNext()) {
            it.next().getChannel().shutdown();
        }
        this._dispatchClientMap.clear();
    }

    static {
        $assertionsDisabled = !QueryDispatcher.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(QueryDispatcher.class);
    }
}
