package org.apache.pinot.query.service;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/QueryDispatcher.class */
public class QueryDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueryDispatcher.class);
    private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/pinot/query/service/QueryDispatcher$DispatchClient.class */
    public static class DispatchClient {
        private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub _blockingStub;
        private final ManagedChannel _managedChannel;

        public DispatchClient(String str, int i) {
            this._managedChannel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this._blockingStub = PinotQueryWorkerGrpc.newBlockingStub(this._managedChannel);
        }

        public Worker.QueryResponse submit(Worker.QueryRequest queryRequest) {
            return this._blockingStub.submit(queryRequest);
        }
    }

    public List<DataTable> submitAndReduce(long j, QueryPlan queryPlan, MailboxService<Mailbox.MailboxContent> mailboxService, long j2) throws Exception {
        MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(Integer.valueOf(submit(j, queryPlan)));
        return reduceMailboxReceive(createReduceStageOperator(mailboxService, ((StageMetadata) queryPlan.getStageMetadataMap().get(Integer.valueOf(mailboxReceiveNode.getSenderStageId()))).getServerInstances(), j, mailboxReceiveNode.getSenderStageId(), mailboxReceiveNode.getDataSchema(), mailboxService.getHostname(), mailboxService.getMailboxPort()), j2);
    }

    public int submit(long j, QueryPlan queryPlan) throws Exception {
        int i = -1;
        for (Map.Entry entry : queryPlan.getStageMetadataMap().entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            if (queryPlan.getQueryStageMap().get(Integer.valueOf(intValue)) instanceof MailboxReceiveNode) {
                i = intValue;
            } else {
                for (ServerInstance serverInstance : ((StageMetadata) entry.getValue()).getServerInstances()) {
                    Worker.QueryResponse submit = getOrCreateDispatchClient(serverInstance.getHostname(), serverInstance.getQueryServicePort()).submit(Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, intValue, serverInstance))).putMetadata("REQUEST_ID", String.valueOf(j)).putMetadata("SERVER_INSTANCE_HOST", serverInstance.getHostname()).putMetadata("SERVER_INSTANCE_PORT", String.valueOf(serverInstance.getQueryMailboxPort())).build());
                    if (submit.containsMetadata("ERROR")) {
                        throw new RuntimeException(String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", Integer.valueOf(intValue), serverInstance, submit));
                    }
                }
            }
        }
        return i;
    }

    private DispatchClient getOrCreateDispatchClient(String str, int i) {
        return this._dispatchClientMap.computeIfAbsent(String.format("%s_%d", str, Integer.valueOf(i)), str2 -> {
            return new DispatchClient(str, i);
        });
    }

    public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int i, ServerInstance serverInstance) {
        return new DistributedStagePlan(i, serverInstance, (StageNode) queryPlan.getQueryStageMap().get(Integer.valueOf(i)), queryPlan.getStageMetadataMap());
    }

    public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) {
        return reduceMailboxReceive(mailboxReceiveOperator, QueryConfig.DEFAULT_TIMEOUT_NANO);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [java.util.List] */
    public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long j) {
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime() + j;
        while (System.nanoTime() < nanoTime) {
            TransferableBlock transferableBlock = (TransferableBlock) mailboxReceiveOperator.nextBlock();
            if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) {
                throw new RuntimeException("Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions());
            }
            if (transferableBlock.getDataBlock() != null) {
                arrayList.add(transferableBlock.getDataBlock());
            }
            if (transferableBlock.isEndOfStreamBlock()) {
                break;
            }
        }
        if (System.nanoTime() >= nanoTime) {
            arrayList = Collections.singletonList(DataBlockUtils.getErrorDataBlock(QueryException.EXECUTION_TIMEOUT_ERROR));
        }
        return arrayList;
    }

    public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService, List<ServerInstance> list, long j, int i, DataSchema dataSchema, String str, int i2) {
        return new MailboxReceiveOperator(mailboxService, dataSchema, list, RelDistribution.Type.RANDOM_DISTRIBUTED, null, str, i2, j, i);
    }

    public void shutdown() {
        Iterator<DispatchClient> it = this._dispatchClientMap.values().iterator();
        while (it.hasNext()) {
            it.next()._managedChannel.shutdown();
        }
        this._dispatchClientMap.clear();
    }
}
