package org.apache.pinot.query.runtime;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.MultiplexingMailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/QueryRunner.class */
public class QueryRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) QueryRunner.class);
    private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = "pinot.server.query.executor";
    private ServerQueryExecutorV1Impl _serverExecutor;
    private HelixManager _helixManager;
    private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
    private MailboxService<TransferableBlock> _mailboxService;
    private String _hostname;
    private int _port;
    private OpChainSchedulerService _scheduler;

    public void init(PinotConfiguration pinotConfiguration, InstanceDataManager instanceDataManager, HelixManager helixManager, ServerMetrics serverMetrics) {
        String property = pinotConfiguration.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
        this._hostname = property.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? property.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : property;
        this._port = pinotConfiguration.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 0);
        this._helixManager = helixManager;
        try {
            long property2 = pinotConfiguration.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, -1L);
            this._scheduler = new OpChainSchedulerService(new RoundRobinScheduler(property2), Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_worker_on_" + this._port + "_port")), property2);
            String str = this._hostname;
            int i = this._port;
            OpChainSchedulerService opChainSchedulerService = this._scheduler;
            Objects.requireNonNull(opChainSchedulerService);
            this._mailboxService = MultiplexingMailboxService.newInstance(str, i, pinotConfiguration, opChainSchedulerService::onDataAvailable);
            this._serverExecutor = new ServerQueryExecutorV1Impl();
            this._serverExecutor.init(pinotConfiguration.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void start() throws TimeoutException {
        this._helixPropertyStore = this._helixManager.getHelixPropertyStore();
        this._mailboxService.start();
        this._serverExecutor.start();
        this._scheduler.startAsync().awaitRunning(30L, TimeUnit.SECONDS);
    }

    public void shutDown() throws TimeoutException {
        this._serverExecutor.shutDown();
        this._mailboxService.shutdown();
        this._scheduler.stopAsync().awaitTerminated(30L, TimeUnit.SECONDS);
    }

    public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> map) {
        if (!isLeafStage(distributedStagePlan)) {
            long parseLong = Long.parseLong(map.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
            long parseLong2 = Long.parseLong(map.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
            StageNode stageRoot = distributedStagePlan.getStageRoot();
            this._scheduler.register(PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(this._mailboxService, parseLong, stageRoot.getStageId(), parseLong2, this._hostname, this._port, distributedStagePlan.getMetadataMap())));
            return;
        }
        List<ServerPlanRequestContext> constructServerQueryRequests = constructServerQueryRequests(distributedStagePlan, map, this._helixPropertyStore, this._mailboxService);
        ArrayList arrayList = new ArrayList(constructServerQueryRequests.size());
        Iterator<ServerPlanRequestContext> it2 = constructServerQueryRequests.iterator();
        while (it2.hasNext()) {
            arrayList.add(processServerQuery(new ServerQueryRequest(it2.next().getInstanceRequest(), new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()), this._scheduler.getWorkerPool()));
        }
        MailboxSendNode mailboxSendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
        MailboxSendOperator mailboxSendOperator = new MailboxSendOperator(this._mailboxService, new LeafStageTransferableBlockOperator(arrayList, mailboxSendNode.getDataSchema()), distributedStagePlan.getMetadataMap().get(Integer.valueOf(mailboxSendNode.getReceiverStageId())).getServerInstances(), mailboxSendNode.getExchangeType(), mailboxSendNode.getPartitionKeySelector(), this._hostname, this._port, constructServerQueryRequests.get(0).getRequestId(), mailboxSendNode.getStageId());
        int i = 0;
        while (!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
            int i2 = i;
            i++;
            LOGGER.debug("Acquired transferable block: {}", Integer.valueOf(i2));
        }
    }

    private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, Map<String, String> map, ZkHelixPropertyStore<ZNRecord> zkHelixPropertyStore, MailboxService<TransferableBlock> mailboxService) {
        StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(Integer.valueOf(distributedStagePlan.getStageId()));
        Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, "Server request for V2 engine should only have 1 scan table per request.");
        String str = stageMetadata.getScannedTables().get(0);
        Map<String, List<String>> map2 = stageMetadata.getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<String>> entry : map2.entrySet()) {
            String key = entry.getKey();
            if (TableType.OFFLINE.name().equals(key)) {
                arrayList.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, map, ZKMetadataProvider.getTableConfig(zkHelixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(str)), ZKMetadataProvider.getTableSchema(zkHelixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(str)), stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, entry.getValue()));
            } else {
                if (!TableType.REALTIME.name().equals(key)) {
                    throw new IllegalArgumentException("Unsupported table type key: " + key);
                }
                arrayList.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, map, ZKMetadataProvider.getTableConfig(zkHelixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str)), ZKMetadataProvider.getTableSchema(zkHelixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str)), stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, entry.getValue()));
            }
        }
        return arrayList;
    }

    private InstanceResponseBlock processServerQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService) {
        try {
            return this._serverExecutor.execute(serverQueryRequest, executorService);
        } catch (Exception e) {
            InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock();
            instanceResponseBlock.getExceptions().put(200, e.getMessage() + QueryException.getTruncatedStackTrace(e));
            return instanceResponseBlock;
        }
    }

    private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
        Map<String, List<String>> map = distributedStagePlan.getMetadataMap().get(Integer.valueOf(distributedStagePlan.getStageId())).getServerInstanceToSegmentsMap().get(distributedStagePlan.getServerInstance());
        return map != null && map.size() > 0;
    }
}
