package org.apache.pinot.broker.grpc;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.net.ssl.KeyManagerFactory;
import nl.altindag.ssl.SSLFactory;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.compression.CompressionFactory;
import org.apache.pinot.common.compression.Compressor;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Broker;
import org.apache.pinot.common.proto.PinotQueryBrokerGrpc;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.encoder.ResponseEncoder;
import org.apache.pinot.common.response.encoder.ResponseEncoderFactory;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.common.utils.tls.RenewableTlsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.trace.DefaultRequestContext;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/grpc/BrokerGrpcServer.class */
public class BrokerGrpcServer extends PinotQueryBrokerGrpc.PinotQueryBrokerImplBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(BrokerGrpcServer.class);
    private final String _brokerId;
    private final Server _server;
    private final int _grpcPort;
    private final int _secureGrpcPort;
    private final GrpcConfig _queryClientConfig;
    private final BrokerMetrics _brokerMetrics;
    private final BrokerRequestHandler _brokerRequestHandler;

    /* loaded from: input_file:org/apache/pinot/broker/grpc/BrokerGrpcServer$BrokerGrpcTransportFilter.class */
    private class BrokerGrpcTransportFilter extends ServerTransportFilter {
        private BrokerGrpcTransportFilter() {
        }

        public Attributes transportReady(Attributes attributes) {
            BrokerGrpcServer.LOGGER.info("gRPC transportReady: REMOTE_ADDR {}", attributes != null ? attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) : "null");
            BrokerGrpcServer.this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_TRANSPORT_READY, 1L);
            return super.transportReady(attributes);
        }

        public void transportTerminated(Attributes attributes) {
            if (attributes != null) {
                BrokerGrpcServer.LOGGER.info("gRPC transportTerminated: REMOTE_ADDR {}", attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
                BrokerGrpcServer.this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_TRANSPORT_TERMINATED, 1L);
            }
        }
    }

    public BrokerGrpcServer(PinotConfiguration pinotConfiguration, String str, BrokerMetrics brokerMetrics, BrokerRequestHandler brokerRequestHandler) {
        this._brokerMetrics = brokerMetrics;
        this._grpcPort = pinotConfiguration.getProperty("pinot.broker.grpc.port", -1);
        this._queryClientConfig = createQueryClientConfig(pinotConfiguration);
        LOGGER.info("gRPC query client config: usePlainText {}", Boolean.valueOf(this._queryClientConfig.isUsePlainText()));
        this._secureGrpcPort = pinotConfiguration.getProperty("pinot.broker.grpc.tls.port", -1);
        if (this._secureGrpcPort > 0) {
            try {
                TlsConfig extractTlsConfig = TlsUtils.extractTlsConfig(pinotConfiguration, "pinot.broker.tls");
                LOGGER.info("Creating Secure gRPC Server in port {}", Integer.valueOf(this._secureGrpcPort));
                this._server = NettyServerBuilder.forPort(this._secureGrpcPort).sslContext(buildGRpcSslContext(extractTlsConfig)).addService(this).addTransportFilter(new BrokerGrpcTransportFilter()).build();
            } catch (Exception e) {
                throw new RuntimeException("Failed to start secure grpcQueryServer", e);
            }
        } else if (this._grpcPort > 0) {
            LOGGER.info("Creating plain text gRPC Server in port {}", Integer.valueOf(this._grpcPort));
            this._server = ServerBuilder.forPort(this._grpcPort).addService(this).addTransportFilter(new BrokerGrpcTransportFilter()).build();
        } else {
            LOGGER.info("Not creating gRPC Server due to the grpc port is {} and secureGrpcPort is {}", Integer.valueOf(this._grpcPort), Integer.valueOf(this._secureGrpcPort));
            this._server = null;
        }
        this._brokerId = str;
        this._brokerRequestHandler = brokerRequestHandler;
        LOGGER.info("Initialized BrokerGrpcServer on port: {}", Integer.valueOf(this._grpcPort));
    }

    public void start() {
        if (this._server == null) {
            LOGGER.info("BrokerGrpcServer is not configured, nothing to start");
            return;
        }
        LOGGER.info("Starting BrokerGrpcServer");
        try {
            this._server.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() {
        if (this._server == null) {
            LOGGER.info("BrokerGrpcServer is not running, nothing to shutdown");
            return;
        }
        LOGGER.info("Shutting down BrokerGrpcServer");
        try {
            this._server.shutdown().awaitTermination();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void submit(Broker.BrokerRequest brokerRequest, StreamObserver<Broker.BrokerResponse> streamObserver) {
        if (this._server == null) {
            LOGGER.info("BrokerGrpcServer is not running, nothing to handle request");
            streamObserver.onError(new RuntimeException("BrokerGrpcServer is not running"));
            return;
        }
        System.nanoTime();
        this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_QUERIES, 1L);
        this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_BYTES_RECEIVED, brokerRequest.getSerializedSize());
        String sql = brokerRequest.getSql();
        Map metadataMap = brokerRequest.getMetadataMap();
        JsonNode newObjectNode = JsonUtils.newObjectNode();
        newObjectNode.put("sql", sql);
        for (Map.Entry entry : metadataMap.entrySet()) {
            newObjectNode.put((String) entry.getKey(), (String) entry.getValue());
        }
        try {
            try {
                BrokerResponse handleRequest = this._brokerRequestHandler.handleRequest(newObjectNode, RequestUtils.parseQuery(sql, newObjectNode), GrpcRequesterIdentity.fromRequest(brokerRequest), new DefaultRequestContext(), null);
                handleRequest.emitBrokerResponseMetrics(this._brokerMetrics);
                ResultTable resultTable = handleRequest.getResultTable();
                if (resultTable == null) {
                    if (handleRequest.getExceptionsSize() > 0) {
                        this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_QUERY_EXCEPTIONS, 1L);
                    }
                    try {
                        streamObserver.onNext(Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(handleRequest.toJsonString().getBytes())).build());
                        streamObserver.onCompleted();
                        return;
                    } catch (IOException e) {
                        streamObserver.onCompleted();
                        throw new RuntimeException(e);
                    }
                }
                try {
                    streamObserver.onNext(Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(handleRequest.toMetadataJsonString().getBytes())).build());
                    try {
                        streamObserver.onNext(Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(resultTable.getDataSchema().toBytes())).build());
                        int parseInt = Integer.parseInt((String) metadataMap.getOrDefault("blockRowSize", String.valueOf(10000)));
                        String str = (String) metadataMap.getOrDefault("compression", "ZSTD");
                        Compressor compressor = CompressionFactory.getCompressor(str);
                        String str2 = (String) metadataMap.getOrDefault("encoding", "JSON");
                        ResponseEncoder responseEncoder = ResponseEncoderFactory.getResponseEncoder(str2);
                        int i = 0;
                        while (true) {
                            int i2 = i;
                            if (i2 >= resultTable.getRows().size()) {
                                streamObserver.onCompleted();
                                return;
                            }
                            try {
                                int min = Math.min(parseInt, resultTable.getRows().size() - i2);
                                byte[] encodeResultTable = responseEncoder.encodeResultTable(resultTable, i2, min);
                                byte[] compress = compressor.compress(encodeResultTable);
                                streamObserver.onNext(Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(compress)).putMetadata("originalSize", String.valueOf(encodeResultTable.length)).putMetadata("compressedSize", String.valueOf(compress.length)).putMetadata("rowSize", String.valueOf(min)).putMetadata("compression", str).putMetadata("encoding", str2).build());
                                i = i2 + parseInt;
                            } catch (Exception e2) {
                                streamObserver.onError(e2);
                                throw new RuntimeException(e2);
                            }
                        }
                    } catch (IOException e3) {
                        streamObserver.onError(e3);
                        throw new RuntimeException(e3);
                    }
                } catch (IOException e4) {
                    streamObserver.onError(e4);
                    throw new RuntimeException(e4);
                }
            } catch (Exception e5) {
                this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.GRPC_QUERY_EXCEPTIONS, 1L);
                LOGGER.error("Error handling DQL request:\n{}\nException: {}", newObjectNode, e5);
                streamObserver.onError(Status.INTERNAL.withDescription(e5.getMessage()).withCause(e5).asRuntimeException());
                throw new RuntimeException(e5);
            }
        } catch (Exception e6) {
            try {
                BrokerResponseNative brokerResponseNative = new BrokerResponseNative(QueryErrorCode.SQL_PARSING, e6.getMessage());
                Broker.BrokerResponse build = Broker.BrokerResponse.newBuilder().setPayload(ByteString.copyFrom(brokerResponseNative.toJsonString().getBytes())).build();
                brokerResponseNative.emitBrokerResponseMetrics(this._brokerMetrics);
                streamObserver.onNext(build);
                streamObserver.onCompleted();
            } catch (IOException e7) {
                streamObserver.onCompleted();
                throw new RuntimeException(e7);
            }
        }
    }

    @VisibleForTesting
    static SslContext buildGRpcSslContext(TlsConfig tlsConfig) throws Exception {
        LOGGER.info("Building gRPC SSL context with");
        if (tlsConfig.getKeyStorePath() == null) {
            throw new IllegalArgumentException("Must provide key store path for secured gRPC server");
        }
        SSLFactory createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores = RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(tlsConfig);
        SslContextBuilder sslProvider = SslContextBuilder.forServer((KeyManagerFactory) createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores.getKeyManagerFactory().get()).sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
        Optional trustManagerFactory = createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores.getTrustManagerFactory();
        Objects.requireNonNull(sslProvider);
        trustManagerFactory.ifPresent(sslProvider::trustManager);
        if (tlsConfig.isClientAuthEnabled()) {
            sslProvider.clientAuth(ClientAuth.REQUIRE);
        }
        return GrpcSslContexts.configure(sslProvider).build();
    }

    @VisibleForTesting
    static GrpcConfig createQueryClientConfig(PinotConfiguration pinotConfiguration) {
        Map map = pinotConfiguration.toMap();
        map.put("usePlainText", Boolean.valueOf(!pinotConfiguration.getProperty("pinot.broker.grpc.tls.enabled", false)));
        Stream filter = map.keySet().stream().filter(str -> {
            return str.startsWith("pinot.broker.grpctls");
        });
        Function function = str2 -> {
            return "tls." + str2.substring("pinot.broker.grpctls".length() + 1);
        };
        Objects.requireNonNull(map);
        map.putAll((Map) filter.collect(Collectors.toMap(function, (v1) -> {
            return r2.get(v1);
        })));
        return new GrpcConfig(map);
    }

    public static boolean isEnabled(PinotConfiguration pinotConfiguration) {
        return pinotConfiguration.getProperty("pinot.broker.grpc.port", -1) > 0 || pinotConfiguration.getProperty("pinot.broker.grpc.tls.port", -1) > 0;
    }

    public static int getGrpcPort(PinotConfiguration pinotConfiguration) {
        int property = pinotConfiguration.getProperty("pinot.broker.grpc.tls.port", -1);
        return property > 0 ? property : pinotConfiguration.getProperty("pinot.broker.grpc.port", -1);
    }
}
