package org.apache.pinot.query.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.MailboxIdUtils;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.RoutingInfo;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.tsdb.planner.TimeSeriesPlanConstants;
import org.apache.pinot.tsdb.spi.PinotTimeSeriesConfiguration;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator;
import org.apache.pinot.tsdb.spi.plan.serde.TimeSeriesPlanSerde;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBuilderFactoryProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/runtime/QueryRunner.class */
public class QueryRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class);
    private String _hostname;
    private int _port;
    private HelixManager _helixManager;
    private ServerMetrics _serverMetrics;
    private ExecutorService _executorService;
    private OpChainSchedulerService _opChainScheduler;
    private MailboxService _mailboxService;
    private QueryExecutor _leafQueryExecutor;

    @Nullable
    private Integer _numGroupsLimit;

    @Nullable
    private Integer _maxInitialResultHolderCapacity;

    @Nullable
    private Integer _minInitialIndexedTableCapacity;

    @Nullable
    private Integer _maxRowsInJoin;

    @Nullable
    private CommonConstants.MultiStageQueryRunner.JoinOverFlowMode _joinOverflowMode;

    @Nullable
    private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;

    public void init(PinotConfiguration pinotConfiguration, InstanceDataManager instanceDataManager, HelixManager helixManager, ServerMetrics serverMetrics, @Nullable TlsConfig tlsConfig) {
        String property = pinotConfiguration.getProperty("pinot.query.runner.hostname");
        if (property.startsWith("Server_")) {
            property = property.substring(CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH);
        }
        int property2 = pinotConfiguration.getProperty("pinot.query.runner.port", 0);
        this._hostname = property;
        this._port = property2;
        this._helixManager = helixManager;
        this._serverMetrics = serverMetrics;
        String property3 = pinotConfiguration.getProperty("pinot.server.query.executor.num.groups.limit");
        this._numGroupsLimit = property3 != null ? Integer.valueOf(Integer.parseInt(property3)) : null;
        String property4 = pinotConfiguration.getProperty("pinot.server.query.executor.max.init.group.holder.capacity");
        this._maxInitialResultHolderCapacity = property4 != null ? Integer.valueOf(Integer.parseInt(property4)) : null;
        String property5 = pinotConfiguration.getProperty("pinot.server.query.executor.min.init.indexed.table.capacity");
        this._minInitialIndexedTableCapacity = property5 != null ? Integer.valueOf(Integer.parseInt(property5)) : null;
        String property6 = pinotConfiguration.getProperty("pinot.query.join.max.rows");
        this._maxRowsInJoin = property6 != null ? Integer.valueOf(Integer.parseInt(property6)) : null;
        String property7 = pinotConfiguration.getProperty("pinot.query.join.overflow.mode");
        this._joinOverflowMode = property7 != null ? CommonConstants.MultiStageQueryRunner.JoinOverFlowMode.valueOf(property7) : null;
        this._executorService = ExecutorServiceUtils.create(pinotConfiguration, "pinot.server.query.executor.multistage.executor", "query-runner-on-" + property2, "cached");
        this._opChainScheduler = new OpChainSchedulerService(this._executorService);
        this._mailboxService = new MailboxService(property, property2, pinotConfiguration, tlsConfig);
        try {
            this._leafQueryExecutor = new ServerQueryExecutorV1Impl();
            this._leafQueryExecutor.init(pinotConfiguration.subset("pinot.server.query.executor"), instanceDataManager, serverMetrics);
            if (StringUtils.isNotBlank(pinotConfiguration.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
                this._timeSeriesPhysicalPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor(this._leafQueryExecutor, this._executorService, serverMetrics);
                TimeSeriesBuilderFactoryProvider.init(pinotConfiguration);
            }
            LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", property, Integer.valueOf(property2));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    public ExecutorService getExecutorService() {
        return this._executorService;
    }

    public void start() {
        this._mailboxService.start();
        this._leafQueryExecutor.start();
    }

    public void shutDown() {
        this._leafQueryExecutor.shutDown();
        this._mailboxService.shutdown();
        ExecutorServiceUtils.close(this._executorService);
    }

    public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> map, @Nullable ThreadExecutionContext threadExecutionContext) {
        long parseLong = Long.parseLong(map.get("requestId"));
        long currentTimeMillis = System.currentTimeMillis() + Long.parseLong(map.get("timeoutMs"));
        StageMetadata stageMetadata = stagePlan.getStageMetadata();
        Map<String, String> consolidateMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), map);
        PipelineBreakerResult executePipelineBreakers = PipelineBreakerExecutor.executePipelineBreakers(this._opChainScheduler, this._mailboxService, workerMetadata, stagePlan, consolidateMetadata, parseLong, currentTimeMillis, threadExecutionContext);
        if (executePipelineBreakers == null || executePipelineBreakers.getErrorBlock() == null) {
            OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(this._mailboxService, parseLong, currentTimeMillis, consolidateMetadata, stageMetadata, workerMetadata, executePipelineBreakers, threadExecutionContext);
            this._opChainScheduler.register(workerMetadata.isLeafStageWorker() ? ServerPlanRequestUtils.compileLeafStage(opChainExecutionContext, stagePlan, this._helixManager, this._serverMetrics, this._leafQueryExecutor, this._executorService) : PlanNodeToOpChain.convert(stagePlan.getRootNode(), opChainExecutionContext));
            return;
        }
        TransferableBlock errorBlock = executePipelineBreakers.getErrorBlock();
        int stageId = stageMetadata.getStageId();
        LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", new Object[]{Long.valueOf(parseLong), Integer.valueOf(stageId), errorBlock.getExceptions()});
        int receiverStageId = stagePlan.getRootNode().getReceiverStageId();
        for (RoutingInfo routingInfo : MailboxIdUtils.toRoutingInfos(parseLong, stageId, workerMetadata.getWorkerId(), receiverStageId, ((MailboxInfos) workerMetadata.getMailboxInfosMap().get(Integer.valueOf(receiverStageId))).getMailboxInfos())) {
            try {
                this._mailboxService.getSendingMailbox(routingInfo.getHostname(), routingInfo.getPort(), routingInfo.getMailboxId(), currentTimeMillis, new StatMap<>(MailboxSendOperator.StatKey.class)).send(errorBlock);
            } catch (TimeoutException e) {
                LOGGER.warn("Timed out sending error block to mailbox: {} for request: {}, stage: {}", new Object[]{routingInfo.getMailboxId(), Long.valueOf(parseLong), Integer.valueOf(stageId), e});
            } catch (Exception e2) {
                LOGGER.error("Caught exception sending error block to mailbox: {} for request: {}, stage: {}", new Object[]{routingInfo.getMailboxId(), Long.valueOf(parseLong), Integer.valueOf(stageId), e2});
            }
        }
    }

    public void processTimeSeriesQuery(String str, Map<String, String> map, StreamObserver<Worker.TimeSeriesResponse> streamObserver) {
        Consumer consumer = th -> {
            HashMap hashMap = new HashMap();
            hashMap.put("errorType", th.getClass().getSimpleName());
            hashMap.put("error", th.getMessage() == null ? "Unknown error: no message" : th.getMessage());
            streamObserver.onNext(Worker.TimeSeriesResponse.newBuilder().putAllMetadata(hashMap).build());
            streamObserver.onCompleted();
        };
        try {
            long extractDeadlineMs = extractDeadlineMs(map);
            Preconditions.checkState(System.currentTimeMillis() < extractDeadlineMs, "Query timed out before getting processed in server. Remaining time: %s", extractDeadlineMs);
            BaseTimeSeriesOperator compile = this._timeSeriesPhysicalPlanVisitor.compile(TimeSeriesPlanSerde.deserialize(str), new TimeSeriesExecutionContext(map.get("language"), extractTimeBuckets(map), extractPlanToSegmentMap(map), extractDeadlineMs, map));
            this._executorService.submit(() -> {
                try {
                    streamObserver.onNext(Worker.TimeSeriesResponse.newBuilder().setPayload(ByteString.copyFrom(PinotBrokerTimeSeriesResponse.fromTimeSeriesBlock(compile.nextBlock()).serialize(), StandardCharsets.UTF_8)).build());
                    streamObserver.onCompleted();
                } catch (Throwable th2) {
                    consumer.accept(th2);
                }
            });
        } catch (Throwable th2) {
            LOGGER.error("Error running time-series query", th2);
            consumer.accept(th2);
        }
    }

    private Map<String, String> consolidateMetadata(Map<String, String> map, Map<String, String> map2) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map2);
        hashMap.putAll(map);
        Integer numGroupsLimit = QueryOptionsUtils.getNumGroupsLimit(hashMap);
        if (numGroupsLimit == null) {
            numGroupsLimit = this._numGroupsLimit;
        }
        if (numGroupsLimit != null) {
            hashMap.put("numGroupsLimit", Integer.toString(numGroupsLimit.intValue()));
        }
        Integer maxInitialResultHolderCapacity = QueryOptionsUtils.getMaxInitialResultHolderCapacity(hashMap);
        if (maxInitialResultHolderCapacity == null) {
            maxInitialResultHolderCapacity = this._maxInitialResultHolderCapacity;
        }
        if (maxInitialResultHolderCapacity != null) {
            hashMap.put("maxInitialResultHolderCapacity", Integer.toString(maxInitialResultHolderCapacity.intValue()));
        }
        Integer minInitialIndexedTableCapacity = QueryOptionsUtils.getMinInitialIndexedTableCapacity(hashMap);
        if (minInitialIndexedTableCapacity == null) {
            minInitialIndexedTableCapacity = this._minInitialIndexedTableCapacity;
        }
        if (minInitialIndexedTableCapacity != null) {
            hashMap.put("minInitialIndexedTableCapacity", Integer.toString(minInitialIndexedTableCapacity.intValue()));
        }
        Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(hashMap);
        if (maxRowsInJoin == null) {
            maxRowsInJoin = this._maxRowsInJoin;
        }
        if (maxRowsInJoin != null) {
            hashMap.put("maxRowsInJoin", Integer.toString(maxRowsInJoin.intValue()));
        }
        CommonConstants.MultiStageQueryRunner.JoinOverFlowMode joinOverflowMode = QueryOptionsUtils.getJoinOverflowMode(hashMap);
        if (joinOverflowMode == null) {
            joinOverflowMode = this._joinOverflowMode;
        }
        if (joinOverflowMode != null) {
            hashMap.put("joinOverflowMode", joinOverflowMode.name());
        }
        return hashMap;
    }

    public void cancel(long j) {
        this._opChainScheduler.cancel(j);
    }

    public StagePlan explainQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map<String, String> map) {
        if (!workerMetadata.isLeafStageWorker()) {
            LOGGER.debug("Explain query on intermediate stages is a NOOP");
            return stagePlan;
        }
        long parseLong = Long.parseLong(map.get("requestId"));
        long currentTimeMillis = System.currentTimeMillis() + Long.parseLong(map.get("timeoutMs"));
        StageMetadata stageMetadata = stagePlan.getStageMetadata();
        Map<String, String> consolidateMetadata = consolidateMetadata(stageMetadata.getCustomProperties(), map);
        if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
            LOGGER.error("Pipeline breaker is not supported in explain query");
            return stagePlan;
        }
        HashMap hashMap = new HashMap();
        ServerPlanRequestUtils.compileLeafStage(new OpChainExecutionContext(this._mailboxService, parseLong, currentTimeMillis, consolidateMetadata, stageMetadata, workerMetadata, null, null), stagePlan, this._helixManager, this._serverMetrics, this._leafQueryExecutor, this._executorService, (planNode, multiStageOperator) -> {
            if (multiStageOperator instanceof LeafStageTransferableBlockOperator) {
                hashMap.put(planNode, ((LeafStageTransferableBlockOperator) multiStageOperator).explain());
            }
        }, true).close();
        return new StagePlan(substituteNode(stagePlan.getRootNode(), hashMap), stagePlan.getStageMetadata());
    }

    private PlanNode substituteNode(PlanNode planNode, Map<PlanNode, ? extends PlanNode> map) {
        if (map.containsKey(planNode)) {
            return map.get(planNode);
        }
        List<PlanNode> inputs = planNode.getInputs();
        ArrayList arrayList = new ArrayList(inputs.size());
        boolean z = false;
        for (PlanNode planNode2 : inputs) {
            PlanNode substituteNode = substituteNode(planNode2, map);
            arrayList.add(substituteNode);
            if (planNode2 != substituteNode) {
                z = true;
            }
        }
        return z ? planNode.withInputs(arrayList) : planNode;
    }

    private long extractDeadlineMs(Map<String, String> map) {
        return Long.parseLong(map.get("deadlineMs"));
    }

    private TimeBuckets extractTimeBuckets(Map<String, String> map) {
        long parseLong = Long.parseLong(map.get("startTimeSeconds"));
        long parseLong2 = Long.parseLong(map.get("windowSeconds"));
        return TimeBuckets.ofSeconds(parseLong, Duration.ofSeconds(parseLong2), Integer.parseInt(map.get("numElements")));
    }

    private Map<String, List<String>> extractPlanToSegmentMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (TimeSeriesPlanConstants.WorkerRequestMetadataKeys.isKeySegmentList(entry.getKey())) {
                hashMap.put(TimeSeriesPlanConstants.WorkerRequestMetadataKeys.decodeSegmentListKey(entry.getKey()), (List) Stream.of((Object[]) entry.getValue().split(",")).map((v0) -> {
                    return v0.strip();
                }).collect(Collectors.toList()));
            }
        }
        return hashMap;
    }
}
