package org.apache.pinot.query.service.dispatch;

import io.grpc.Deadline;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.calcite.util.Pair;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.OpChainStats;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/dispatch/QueryDispatcher.class */
public class QueryDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) QueryDispatcher.class);
    private static final String PINOT_BROKER_QUERY_DISPATCHER_FORMAT = "multistage-query-dispatch-%d";
    private final MailboxService _mailboxService;
    private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap();
    private final ExecutorService _executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(5, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT));

    public QueryDispatcher(MailboxService mailboxService) {
        this._mailboxService = mailboxService;
    }

    public ResultTable submitAndReduce(RequestContext requestContext, DispatchableSubPlan dispatchableSubPlan, long j, Map<String, String> map, Map<Integer, ExecutionStatsAggregator> map2) throws Exception {
        long requestId = requestContext.getRequestId();
        try {
            submit(requestId, dispatchableSubPlan, j, map);
            long nanoTime = System.nanoTime();
            ResultTable runReducer = runReducer(requestId, dispatchableSubPlan, j, map, map2, this._mailboxService);
            requestContext.setReduceTimeNanos(System.nanoTime() - nanoTime);
            return runReducer;
        } catch (Throwable th) {
            cancel(requestId, dispatchableSubPlan);
            throw th;
        }
    }

    @VisibleForTesting
    void submit(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map) throws Exception {
        Deadline after = Deadline.after(j2, TimeUnit.MILLISECONDS);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        List<DispatchablePlanFragment> queryStageList = dispatchableSubPlan.getQueryStageList();
        int size = queryStageList.size();
        int i = 0;
        for (int i2 = 1; i2 < size; i2++) {
            for (Map.Entry<QueryServerInstance, List<Integer>> entry : queryStageList.get(i2).getServerInstanceToWorkerIdMap().entrySet()) {
                QueryServerInstance key = entry.getKey();
                Worker.QueryRequest.Builder newBuilder = Worker.QueryRequest.newBuilder();
                newBuilder.addStagePlan(QueryPlanSerDeUtils.serialize(dispatchableSubPlan, i2, key, entry.getValue()));
                Worker.QueryRequest build = newBuilder.putMetadata("requestId", String.valueOf(j)).putMetadata(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(j2)).putAllMetadata(map).build();
                DispatchClient orCreateDispatchClient = getOrCreateDispatchClient(key);
                int i3 = i2;
                this._executorService.submit(() -> {
                    Objects.requireNonNull(linkedBlockingQueue);
                    orCreateDispatchClient.submit(build, i3, key, after, (v1) -> {
                        r5.offer(v1);
                    });
                });
                i++;
            }
        }
        int i4 = 0;
        while (!after.isExpired() && i4 < i) {
            AsyncQueryDispatchResponse asyncQueryDispatchResponse = (AsyncQueryDispatchResponse) linkedBlockingQueue.poll(after.timeRemaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
            if (asyncQueryDispatchResponse != null) {
                if (asyncQueryDispatchResponse.getThrowable() != null) {
                    throw new RuntimeException(String.format("Error dispatching query to server=%s stage=%s", asyncQueryDispatchResponse.getVirtualServer(), Integer.valueOf(asyncQueryDispatchResponse.getStageId())), asyncQueryDispatchResponse.getThrowable());
                }
                Worker.QueryResponse queryResponse = asyncQueryDispatchResponse.getQueryResponse();
                if (queryResponse.containsMetadata("ERROR")) {
                    throw new RuntimeException(String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", Integer.valueOf(asyncQueryDispatchResponse.getStageId()), asyncQueryDispatchResponse.getVirtualServer(), queryResponse.getMetadataOrDefault("ERROR", "null")));
                }
                i4++;
            }
        }
        if (after.isExpired()) {
            throw new TimeoutException("Timed out waiting for response of async query-dispatch");
        }
    }

    private void cancel(long j, DispatchableSubPlan dispatchableSubPlan) {
        List<DispatchablePlanFragment> queryStageList = dispatchableSubPlan.getQueryStageList();
        int size = queryStageList.size();
        HashSet hashSet = new HashSet();
        for (int i = 1; i < size; i++) {
            hashSet.addAll(queryStageList.get(i).getServerInstanceToWorkerIdMap().keySet());
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            getOrCreateDispatchClient((QueryServerInstance) it2.next()).cancel(j);
        }
    }

    private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) {
        String hostname = queryServerInstance.getHostname();
        int queryServicePort = queryServerInstance.getQueryServicePort();
        return this._dispatchClientMap.computeIfAbsent(String.format("%s_%d", hostname, Integer.valueOf(queryServicePort)), str -> {
            return new DispatchClient(hostname, queryServicePort);
        });
    }

    @VisibleForTesting
    public static ResultTable runReducer(long j, DispatchableSubPlan dispatchableSubPlan, long j2, Map<String, String> map, @Nullable Map<Integer, ExecutionStatsAggregator> map2, MailboxService mailboxService) {
        DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(0);
        PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment();
        PlanNode fragmentRoot = planFragment.getFragmentRoot();
        Preconditions.checkState(fragmentRoot instanceof MailboxReceiveNode, "Expecting mailbox receive node as root of reduce stage, got: %s", fragmentRoot.getClass().getSimpleName());
        MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) fragmentRoot;
        List<WorkerMetadata> workerMetadataList = dispatchablePlanFragment.getWorkerMetadataList();
        Preconditions.checkState(workerMetadataList.size() == 1, "Expecting single worker for reduce stage, got: %s", workerMetadataList.size());
        OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, j, planFragment.getFragmentId(), workerMetadataList.get(0).getVirtualServerAddress(), System.currentTimeMillis() + j2, map, new StageMetadata.Builder().setWorkerMetadataList(workerMetadataList).addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build(), null);
        ResultTable resultTable = getResultTable(new MailboxReceiveOperator(opChainExecutionContext, mailboxReceiveNode.getDistributionType(), mailboxReceiveNode.getSenderStageId()), mailboxReceiveNode.getDataSchema(), dispatchableSubPlan.getQueryResultFields());
        collectStats(dispatchableSubPlan, opChainExecutionContext.getStats(), map2);
        return resultTable;
    }

    private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats, @Nullable Map<Integer, ExecutionStatsAggregator> map) {
        if (map != null) {
            LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", Long.valueOf(opChainStats.getExecutionTime()));
            for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) {
                OperatorStats value = entry.getValue();
                ExecutionStatsAggregator executionStatsAggregator = map.get(0);
                ExecutionStatsAggregator executionStatsAggregator2 = map.get(Integer.valueOf(value.getStageId()));
                executionStatsAggregator.aggregate(null, entry.getValue().getExecutionStats(), new HashMap());
                if (executionStatsAggregator2 != null) {
                    if (dispatchableSubPlan != null) {
                        OperatorUtils.recordTableName(value, dispatchableSubPlan.getQueryStageList().get(value.getStageId()));
                    }
                    executionStatsAggregator2.aggregate(null, entry.getValue().getExecutionStats(), new HashMap());
                }
            }
        }
    }

    private static ResultTable getResultTable(MailboxReceiveOperator mailboxReceiveOperator, DataSchema dataSchema, List<Pair<Integer, String>> list) {
        TransferableBlock transferableBlock;
        int size = list.size();
        String[] strArr = new String[size];
        DataSchema.ColumnDataType[] columnDataTypeArr = new DataSchema.ColumnDataType[size];
        for (int i = 0; i < size; i++) {
            Pair<Integer, String> pair = list.get(i);
            strArr[i] = pair.right;
            columnDataTypeArr[i] = dataSchema.getColumnDataType(pair.left.intValue());
        }
        DataSchema dataSchema2 = new DataSchema(strArr, columnDataTypeArr);
        ArrayList arrayList = new ArrayList();
        TransferableBlock nextBlock = mailboxReceiveOperator.nextBlock();
        while (true) {
            transferableBlock = nextBlock;
            if (TransferableBlockUtils.isEndOfStream(transferableBlock)) {
                break;
            }
            DataBlock dataBlock = transferableBlock.getDataBlock();
            int numberOfRows = dataBlock.getNumberOfRows();
            if (numberOfRows > 0) {
                arrayList.ensureCapacity(arrayList.size() + numberOfRows);
                for (Object[] objArr : DataBlockUtils.extractRows(dataBlock, ObjectSerDeUtils::deserialize)) {
                    Object[] objArr2 = new Object[size];
                    for (int i2 = 0; i2 < size; i2++) {
                        Object obj = objArr[list.get(i2).left.intValue()];
                        if (obj != null) {
                            DataSchema.ColumnDataType columnDataType = columnDataTypeArr[i2];
                            objArr2[i2] = columnDataType.format(columnDataType.toExternal(obj));
                        }
                    }
                    arrayList.add(objArr2);
                }
            }
            nextBlock = mailboxReceiveOperator.nextBlock();
        }
        if (transferableBlock.isErrorBlock()) {
            throw new RuntimeException("Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions());
        }
        return new ResultTable(dataSchema2, arrayList);
    }

    public void shutdown() {
        Iterator<DispatchClient> it2 = this._dispatchClientMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().getChannel().shutdown();
        }
        this._dispatchClientMap.clear();
    }
}
