package org.apache.pinot.query.service.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/server/QueryServer.class */
public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueryServer.class);
    private static final int MAX_INBOUND_MESSAGE_SIZE = 67108864;
    private final int _port;
    private final QueryRunner _queryRunner;
    private final ExecutorService _querySubmissionExecutorService;
    private Server _server = null;

    public QueryServer(int i, QueryRunner queryRunner) {
        this._port = i;
        this._queryRunner = queryRunner;
        this._querySubmissionExecutorService = Executors.newCachedThreadPool(new NamedThreadFactory("query_submission_executor_on_" + this._port + "_port"));
    }

    public void start() {
        LOGGER.info("Starting QueryServer");
        try {
            if (this._server == null) {
                this._server = ServerBuilder.forPort(this._port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
                LOGGER.info("Initialized QueryServer on port: {}", Integer.valueOf(this._port));
            }
            this._queryRunner.start();
            this._server.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        LOGGER.info("Shutting down QueryServer");
        try {
            this._queryRunner.shutDown();
            if (this._server != null) {
                this._server.shutdown();
                this._server.awaitTermination();
            }
            this._querySubmissionExecutorService.shutdown();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void submit(Worker.QueryRequest queryRequest, StreamObserver<Worker.QueryResponse> streamObserver) {
        try {
            Map fromProtoProperties = QueryPlanSerDeUtils.fromProtoProperties(queryRequest.getMetadata());
            long parseLong = Long.parseLong((String) fromProtoProperties.get("requestId"));
            long currentTimeMillis = System.currentTimeMillis() + Long.parseLong((String) fromProtoProperties.get("timeoutMs"));
            List stagePlanList = queryRequest.getStagePlanList();
            int size = stagePlanList.size();
            CompletableFuture[] completableFutureArr = new CompletableFuture[size];
            for (int i = 0; i < size; i++) {
                Worker.StagePlan stagePlan = (Worker.StagePlan) stagePlanList.get(i);
                completableFutureArr[i] = CompletableFuture.runAsync(() -> {
                    try {
                        StagePlan fromProtoStagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(stagePlan);
                        StageMetadata stageMetadata = fromProtoStagePlan.getStageMetadata();
                        List workerMetadataList = stageMetadata.getWorkerMetadataList();
                        int size2 = workerMetadataList.size();
                        CompletableFuture[] completableFutureArr2 = new CompletableFuture[size2];
                        for (int i2 = 0; i2 < size2; i2++) {
                            try {
                                WorkerMetadata workerMetadata = (WorkerMetadata) workerMetadataList.get(i2);
                                completableFutureArr2[i2] = CompletableFuture.runAsync(() -> {
                                    this._queryRunner.processQuery(workerMetadata, fromProtoStagePlan, fromProtoProperties);
                                }, this._querySubmissionExecutorService);
                            } catch (Throwable th) {
                                for (CompletableFuture completableFuture : completableFutureArr2) {
                                    if (!completableFuture.isDone()) {
                                        completableFuture.cancel(true);
                                    }
                                }
                                throw th;
                            }
                        }
                        try {
                            CompletableFuture.allOf(completableFutureArr2).get(currentTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                            for (CompletableFuture completableFuture2 : completableFutureArr2) {
                                if (!completableFuture2.isDone()) {
                                    completableFuture2.cancel(true);
                                }
                            }
                        } catch (Exception e) {
                            throw new RuntimeException(String.format("Caught exception while submitting request: %d, stage: %d", Long.valueOf(parseLong), Integer.valueOf(stageMetadata.getStageId())), e);
                        }
                    } catch (Exception e2) {
                        throw new RuntimeException(String.format("Caught exception while deserializing stage plan for request: %d, stage: %d", Long.valueOf(parseLong), Integer.valueOf(stagePlan.getStageMetadata().getStageId())), e2);
                    }
                }, this._querySubmissionExecutorService);
            }
            try {
                try {
                    CompletableFuture.allOf(completableFutureArr).get(currentTimeMillis - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    for (CompletableFuture completableFuture : completableFutureArr) {
                        if (!completableFuture.isDone()) {
                            completableFuture.cancel(true);
                        }
                    }
                    streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("OK", "").build());
                    streamObserver.onCompleted();
                } catch (Exception e) {
                    LOGGER.error("Caught exception while submitting request: {}", Long.valueOf(parseLong), e);
                    streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e)).build());
                    streamObserver.onCompleted();
                    for (CompletableFuture completableFuture2 : completableFutureArr) {
                        if (!completableFuture2.isDone()) {
                            completableFuture2.cancel(true);
                        }
                    }
                }
            } catch (Throwable th) {
                for (CompletableFuture completableFuture3 : completableFutureArr) {
                    if (!completableFuture3.isDone()) {
                        completableFuture3.cancel(true);
                    }
                }
                throw th;
            }
        } catch (Exception e2) {
            LOGGER.error("Caught exception while deserializing request metadata", e2);
            streamObserver.onNext(Worker.QueryResponse.newBuilder().putMetadata("ERROR", QueryException.getTruncatedStackTrace(e2)).build());
            streamObserver.onCompleted();
        }
    }

    public void cancel(Worker.CancelRequest cancelRequest, StreamObserver<Worker.CancelResponse> streamObserver) {
        try {
            this._queryRunner.cancel(cancelRequest.getRequestId());
        } catch (Throwable th) {
            LOGGER.error("Caught exception while cancelling opChain for request: {}", Long.valueOf(cancelRequest.getRequestId()), th);
        }
        streamObserver.onCompleted();
    }
}
