package org.apache.pinot.query.service;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.util.Pair;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.roaringbitmap.RoaringBitmap;

/* loaded from: input_file:org/apache/pinot/query/service/QueryDispatcher.class */
public class QueryDispatcher {
    private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/pinot/query/service/QueryDispatcher$DispatchClient.class */
    public static class DispatchClient {
        private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub _blockingStub;
        private final ManagedChannel _managedChannel;

        public DispatchClient(String str, int i) {
            this._managedChannel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
            this._blockingStub = PinotQueryWorkerGrpc.newBlockingStub(this._managedChannel);
        }

        public Worker.QueryResponse submit(Worker.QueryRequest queryRequest) {
            return this._blockingStub.submit(queryRequest);
        }
    }

    public ResultTable submitAndReduce(long j, QueryPlan queryPlan, MailboxService<TransferableBlock> mailboxService, long j2) throws Exception {
        MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(Integer.valueOf(submit(j, queryPlan, j2)));
        return toResultTable(reduceMailboxReceive(createReduceStageOperator(mailboxService, ((StageMetadata) queryPlan.getStageMetadataMap().get(Integer.valueOf(mailboxReceiveNode.getSenderStageId()))).getServerInstances(), j, mailboxReceiveNode.getSenderStageId(), mailboxReceiveNode.getDataSchema(), mailboxService.getHostname(), mailboxService.getMailboxPort(), j2), j2), queryPlan.getQueryResultFields(), ((StageNode) queryPlan.getQueryStageMap().get(0)).getDataSchema());
    }

    public int submit(long j, QueryPlan queryPlan, long j2) throws Exception {
        int i = -1;
        for (Map.Entry entry : queryPlan.getStageMetadataMap().entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            if (queryPlan.getQueryStageMap().get(Integer.valueOf(intValue)) instanceof MailboxReceiveNode) {
                i = intValue;
            } else {
                for (ServerInstance serverInstance : ((StageMetadata) entry.getValue()).getServerInstances()) {
                    Worker.QueryResponse submit = getOrCreateDispatchClient(serverInstance.getHostname(), serverInstance.getQueryServicePort()).submit(Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, intValue, serverInstance))).putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(j)).putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(j2)).build());
                    if (submit.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
                        throw new RuntimeException(String.format("Unable to execute query plan at stage %s on server %s: ERROR: %s", Integer.valueOf(intValue), serverInstance, submit));
                    }
                }
            }
        }
        return i;
    }

    private DispatchClient getOrCreateDispatchClient(String str, int i) {
        return this._dispatchClientMap.computeIfAbsent(String.format("%s_%d", str, Integer.valueOf(i)), str2 -> {
            return new DispatchClient(str, i);
        });
    }

    public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int i, ServerInstance serverInstance) {
        return new DistributedStagePlan(i, serverInstance, (StageNode) queryPlan.getQueryStageMap().get(Integer.valueOf(i)), queryPlan.getStageMetadataMap());
    }

    public static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long j) {
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime() + (j * 1000000);
        while (System.nanoTime() < nanoTime) {
            TransferableBlock transferableBlock = (TransferableBlock) mailboxReceiveOperator.nextBlock();
            if (TransferableBlockUtils.isEndOfStream(transferableBlock) && transferableBlock.isErrorBlock()) {
                throw new RuntimeException("Received error query execution result block: " + transferableBlock.getDataBlock().getExceptions());
            }
            if (!transferableBlock.isNoOpBlock()) {
                if (transferableBlock.isEndOfStreamBlock()) {
                    return arrayList;
                }
                arrayList.add(transferableBlock.getDataBlock());
            }
        }
        throw new RuntimeException("Timed out while receiving from mailbox: " + QueryException.EXECUTION_TIMEOUT_ERROR);
    }

    public static ResultTable toResultTable(List<DataBlock> list, List<Pair<Integer, String>> list2, DataSchema dataSchema) {
        ArrayList arrayList = new ArrayList();
        DataSchema resultSchema = toResultSchema(dataSchema, list2);
        for (DataBlock dataBlock : list) {
            int length = resultSchema.getColumnNames().length;
            int numberOfRows = dataBlock.getNumberOfRows();
            ArrayList arrayList2 = new ArrayList(dataBlock.getNumberOfRows());
            if (numberOfRows > 0) {
                RoaringBitmap[] roaringBitmapArr = new RoaringBitmap[length];
                for (int i = 0; i < length; i++) {
                    roaringBitmapArr[i] = dataBlock.getNullRowIds(i);
                }
                int i2 = 0;
                for (Object[] objArr : DataBlockUtils.extractRows(dataBlock)) {
                    Object[] objArr2 = new Object[length];
                    int i3 = 0;
                    for (Pair<Integer, String> pair : list2) {
                        if (roaringBitmapArr[i3] == null || !roaringBitmapArr[i3].contains(i2)) {
                            int i4 = i3;
                            i3++;
                            objArr2[i4] = objArr[((Integer) pair.left).intValue()];
                        } else {
                            int i5 = i3;
                            i3++;
                            objArr2[i5] = null;
                        }
                    }
                    arrayList2.add(objArr2);
                    i2++;
                }
            }
            arrayList.addAll(arrayList2);
        }
        return new ResultTable(resultSchema, arrayList);
    }

    private static DataSchema toResultSchema(DataSchema dataSchema, List<Pair<Integer, String>> list) {
        String[] strArr = new String[list.size()];
        DataSchema.ColumnDataType[] columnDataTypeArr = new DataSchema.ColumnDataType[list.size()];
        for (int i = 0; i < list.size(); i++) {
            strArr[i] = (String) list.get(i).right;
            columnDataTypeArr[i] = dataSchema.getColumnDataType(((Integer) list.get(i).left).intValue());
        }
        return new DataSchema(strArr, columnDataTypeArr);
    }

    @VisibleForTesting
    public static MailboxReceiveOperator createReduceStageOperator(MailboxService<TransferableBlock> mailboxService, List<ServerInstance> list, long j, int i, DataSchema dataSchema, String str, int i2, long j2) {
        return new MailboxReceiveOperator(mailboxService, list, RelDistribution.Type.RANDOM_DISTRIBUTED, str, i2, j, i, Long.valueOf(j2));
    }

    public void shutdown() {
        Iterator<DispatchClient> it = this._dispatchClientMap.values().iterator();
        while (it.hasNext()) {
            it.next()._managedChannel.shutdown();
        }
        this._dispatchClientMap.clear();
    }
}
