package org.apache.pinot.query.service.server;

import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/server/QueryServer.class */
public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueryServer.class);
    private static final int MAX_INBOUND_MESSAGE_SIZE = 67108864;
    private final int _port;
    private final QueryRunner _queryRunner;

    @Nullable
    private final TlsConfig _tlsConfig;
    private final ExecutorService _querySubmissionExecutorService;
    private Server _server = null;

    public QueryServer(int i, QueryRunner queryRunner, @Nullable TlsConfig tlsConfig) {
        this._port = i;
        this._queryRunner = queryRunner;
        this._tlsConfig = tlsConfig;
        this._querySubmissionExecutorService = Executors.newCachedThreadPool(new NamedThreadFactory("query_submission_executor_on_" + this._port + "_port"));
    }

    public void start() {
        LOGGER.info("Starting QueryServer");
        try {
            if (this._server == null) {
                if (this._tlsConfig == null) {
                    this._server = ServerBuilder.forPort(this._port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
                } else {
                    this._server = NettyServerBuilder.forPort(this._port).addService(this).sslContext(GrpcQueryServer.buildGrpcSslContext(this._tlsConfig)).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
                }
                LOGGER.info("Initialized QueryServer on port: {}", Integer.valueOf(this._port));
            }
            this._queryRunner.start();
            this._server.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        LOGGER.info("Shutting down QueryServer");
        try {
            this._queryRunner.shutDown();
            if (this._server != null) {
                this._server.shutdown();
                this._server.awaitTermination();
            }
            this._querySubmissionExecutorService.shutdown();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void submit(Worker.QueryRequest queryRequest, StreamObserver<Worker.QueryResponse> streamObserver) {
        try {
            Map fromProtoProperties = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
            long parseLong = Long.parseLong((String) fromProtoProperties.get("requestId"));
            long currentTimeMillis = System.currentTimeMillis() + Long.parseLong((String) fromProtoProperties.get("timeoutMs"));
            Tracing.ThreadAccountantOps.setupRunner(Long.toString(parseLong), ThreadExecutionContext.TaskType.MSE);
            ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext();
            try {
                try {
                    forEachStage(queryRequest, parseLong, currentTimeMillis, (stagePlan, workerMetadata) -> {
                        this._queryRunner.processQuery(workerMetadata, stagePlan, fromProtoProperties, threadExecutionContext);
                        return null;
                    }, list -> {
                    });
                    Tracing.getThreadAccountant().clear();
                    streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("OK", "").build());
                    streamObserver.onCompleted();
                } catch (InterruptedException | RuntimeException | ExecutionException | TimeoutException e) {
                    LOGGER.error("Caught exception while submitting request: {}", Long.valueOf(parseLong), e);
                    streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e)).build());
                    streamObserver.onCompleted();
                    Tracing.getThreadAccountant().clear();
                }
            } catch (Throwable th) {
                Tracing.getThreadAccountant().clear();
                throw th;
            }
        } catch (Exception e2) {
            LOGGER.error("Caught exception while deserializing request metadata", e2);
            streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e2)).build());
            streamObserver.onCompleted();
        }
    }

    public void explain(Worker.QueryRequest queryRequest, StreamObserver<Worker.ExplainResponse> streamObserver) {
        try {
            Map fromProtoProperties = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
            long parseLong = Long.parseLong((String) fromProtoProperties.get("requestId"));
            try {
                forEachStage(queryRequest, parseLong, System.currentTimeMillis() + Long.parseLong((String) fromProtoProperties.get("timeoutMs")), (stagePlan, workerMetadata) -> {
                    return this._queryRunner.explainQuery(workerMetadata, stagePlan, fromProtoProperties);
                }, list -> {
                    Worker.ExplainResponse.Builder newBuilder = Worker.ExplainResponse.newBuilder();
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        StagePlan stagePlan2 = (StagePlan) it.next();
                        ByteString byteString = PlanNodeSerializer.process(stagePlan2.getRootNode()).toByteString();
                        StageMetadata stageMetadata = stagePlan2.getStageMetadata();
                        newBuilder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(byteString).setStageMetadata(Worker.StageMetadata.newBuilder().setStageId(stageMetadata.getStageId()).addAllWorkerMetadata(QueryPlanSerDeUtils.toProtoWorkerMetadataList(stageMetadata.getWorkerMetadataList())).setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(stageMetadata.getCustomProperties()))));
                    }
                    newBuilder.putMetadata("OK", "");
                    streamObserver.onNext(newBuilder.build());
                });
                streamObserver.onNext(Worker.ExplainResponse.newBuilder().putMetadata("OK", "").build());
                streamObserver.onCompleted();
            } catch (InterruptedException | RuntimeException | ExecutionException | TimeoutException e) {
                LOGGER.error("Caught exception while submitting request: {}", Long.valueOf(parseLong), e);
                streamObserver.onNext(Worker.ExplainResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e)).build());
                streamObserver.onCompleted();
            }
        } catch (Exception e2) {
            LOGGER.error("Caught exception while deserializing request metadata", e2);
            streamObserver.onNext(Worker.ExplainResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e2)).build());
            streamObserver.onCompleted();
        }
    }

    public void submitTimeSeries(Worker.TimeSeriesQueryRequest timeSeriesQueryRequest, StreamObserver<Worker.TimeSeriesResponse> streamObserver) {
        this._queryRunner.processTimeSeriesQuery(timeSeriesQueryRequest.getDispatchPlan(0), timeSeriesQueryRequest.getMetadataMap(), streamObserver);
    }

    public void cancel(Worker.CancelRequest cancelRequest, StreamObserver<Worker.CancelResponse> streamObserver) {
        try {
            this._queryRunner.cancel(cancelRequest.getRequestId());
        } catch (Throwable th) {
            LOGGER.error("Caught exception while cancelling opChain for request: {}", Long.valueOf(cancelRequest.getRequestId()), th);
        }
        streamObserver.onCompleted();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <W> void submitStage(Worker.StagePlan stagePlan, long j, long j2, BiFunction<StagePlan, WorkerMetadata, W> biFunction, Consumer<W> consumer) {
        try {
            StagePlan fromProtoStagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(stagePlan);
            StageMetadata stageMetadata = fromProtoStagePlan.getStageMetadata();
            List workerMetadataList = stageMetadata.getWorkerMetadataList();
            int size = workerMetadataList.size();
            CompletableFuture[] completableFutureArr = new CompletableFuture[size];
            for (int i = 0; i < size; i++) {
                try {
                    WorkerMetadata workerMetadata = (WorkerMetadata) workerMetadataList.get(i);
                    completableFutureArr[i] = CompletableFuture.supplyAsync(() -> {
                        return biFunction.apply(fromProtoStagePlan, workerMetadata);
                    }, this._querySubmissionExecutorService);
                } finally {
                    for (CompletableFuture completableFuture : completableFutureArr) {
                        if (!completableFuture.isDone()) {
                            completableFuture.cancel(true);
                        }
                    }
                }
            }
            for (int i2 = 0; i2 < size; i2++) {
                try {
                    try {
                        consumer.accept(completableFutureArr[i2].get(j2 - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
                    } catch (Exception e) {
                        stageMetadata.getStageId();
                        RuntimeException runtimeException = new RuntimeException("Caught exception while submitting request: " + j + ", stage: " + runtimeException, e);
                        throw runtimeException;
                    }
                } catch (TimeoutException e2) {
                    stageMetadata.getStageId();
                    RuntimeException runtimeException2 = new RuntimeException("Timeout while submitting request: " + j + ", stage: " + runtimeException2, e2);
                    throw runtimeException2;
                }
            }
        } catch (Exception e3) {
            throw new RuntimeException(String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", Long.valueOf(j), Integer.valueOf(stagePlan.getStageMetadata().getStageId())), e3);
        }
    }

    <W> void forEachStage(Worker.QueryRequest queryRequest, long j, long j2, BiFunction<StagePlan, WorkerMetadata, W> biFunction, Consumer<List<W>> consumer) throws ExecutionException, InterruptedException, TimeoutException {
        List<Worker.StagePlan> stagePlanList = queryRequest.getStagePlanList();
        ArrayList<CompletableFuture> arrayList = new ArrayList(stagePlanList.size());
        for (Worker.StagePlan stagePlan : stagePlanList) {
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                ArrayList arrayList2 = new ArrayList();
                Objects.requireNonNull(arrayList2);
                submitStage(stagePlan, j, j2, biFunction, arrayList2::add);
                return arrayList2;
            }, this._querySubmissionExecutorService));
        }
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                consumer.accept((List) ((CompletableFuture) it.next()).get(j2 - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
            }
        } finally {
            for (CompletableFuture completableFuture : arrayList) {
                if (!completableFuture.isDone()) {
                    completableFuture.cancel(true);
                }
            }
        }
    }
}
