package org.apache.pinot.query.runtime.executor;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.query.request.context.ThreadTimer;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.class */
public class WorkerQueryExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class);
    private PinotConfiguration _config;
    private ServerMetrics _serverMetrics;
    private MailboxService<Mailbox.MailboxContent> _mailboxService;
    private String _hostName;
    private int _port;

    public void init(PinotConfiguration pinotConfiguration, ServerMetrics serverMetrics, MailboxService<Mailbox.MailboxContent> mailboxService, String str, int i) {
        this._config = pinotConfiguration;
        this._serverMetrics = serverMetrics;
        this._mailboxService = mailboxService;
        this._hostName = str;
        this._port = i;
    }

    public synchronized void start() {
        LOGGER.info("Worker query executor started");
    }

    public synchronized void shutDown() {
        LOGGER.info("Worker query executor shut down");
    }

    public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> map, ExecutorService executorService) {
        final BaseOperator<TransferableBlock> operator = getOperator(Long.parseLong(map.get("REQUEST_ID")), distributedStagePlan.getStageRoot(), distributedStagePlan.getMetadataMap());
        executorService.submit((Runnable) new TraceRunnable() { // from class: org.apache.pinot.query.runtime.executor.WorkerQueryExecutor.1
            public void runJob() {
                ThreadTimer threadTimer = new ThreadTimer();
                while (!TransferableBlockUtils.isEndOfStream((TransferableBlock) operator.nextBlock())) {
                    WorkerQueryExecutor.LOGGER.debug("Result Block acquired");
                }
                WorkerQueryExecutor.LOGGER.info("Execution time:" + threadTimer.getThreadTimeNs());
            }
        });
    }

    private BaseOperator<TransferableBlock> getOperator(long j, StageNode stageNode, Map<Integer, StageMetadata> map) {
        if (stageNode instanceof MailboxReceiveNode) {
            MailboxReceiveNode mailboxReceiveNode = (MailboxReceiveNode) stageNode;
            return new MailboxReceiveOperator(this._mailboxService, mailboxReceiveNode.getDataSchema(), map.get(Integer.valueOf(mailboxReceiveNode.getSenderStageId())).getServerInstances(), mailboxReceiveNode.getExchangeType(), mailboxReceiveNode.getPartitionKeySelector(), this._hostName, this._port, j, mailboxReceiveNode.getSenderStageId());
        }
        if (stageNode instanceof MailboxSendNode) {
            MailboxSendNode mailboxSendNode = (MailboxSendNode) stageNode;
            return new MailboxSendOperator(this._mailboxService, mailboxSendNode.getDataSchema(), getOperator(j, (StageNode) mailboxSendNode.getInputs().get(0), map), map.get(Integer.valueOf(mailboxSendNode.getReceiverStageId())).getServerInstances(), mailboxSendNode.getExchangeType(), mailboxSendNode.getPartitionKeySelector(), this._hostName, this._port, j, mailboxSendNode.getStageId());
        }
        if (stageNode instanceof JoinNode) {
            JoinNode joinNode = (JoinNode) stageNode;
            return new HashJoinOperator(getOperator(j, (StageNode) joinNode.getInputs().get(0), map), ((StageNode) joinNode.getInputs().get(0)).getDataSchema(), getOperator(j, (StageNode) joinNode.getInputs().get(1), map), ((StageNode) joinNode.getInputs().get(1)).getDataSchema(), joinNode.getDataSchema(), joinNode.getCriteria());
        }
        if (stageNode instanceof AggregateNode) {
            AggregateNode aggregateNode = (AggregateNode) stageNode;
            return new AggregateOperator(getOperator(j, (StageNode) aggregateNode.getInputs().get(0), map), aggregateNode.getDataSchema(), aggregateNode.getAggCalls(), aggregateNode.getGroupSet(), ((StageNode) aggregateNode.getInputs().get(0)).getDataSchema());
        }
        if (stageNode instanceof FilterNode) {
            throw new UnsupportedOperationException("Unsupported!");
        }
        if (!(stageNode instanceof ProjectNode)) {
            throw new UnsupportedOperationException(String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
        }
        ProjectNode projectNode = (ProjectNode) stageNode;
        return new TransformOperator(getOperator(j, (StageNode) projectNode.getInputs().get(0), map), projectNode.getDataSchema(), projectNode.getProjects(), ((StageNode) projectNode.getInputs().get(0)).getDataSchema());
    }
}
