package org.apache.pinot.core.transport;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/pinot/core/transport/InstanceRequestHandler.class */
public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InstanceRequestHandler.class);
    private static final int SLOW_QUERY_LATENCY_THRESHOLD_MS = 100;
    private static final int LARGE_RESPONSE_SIZE_THRESHOLD_BYTES = 104857600;
    private final String _instanceName;
    private final ThreadLocal<TDeserializer> _deserializer = ThreadLocal.withInitial(() -> {
        try {
            return new TDeserializer(new TCompactProtocol.Factory());
        } catch (TTransportException e) {
            throw new RuntimeException("Failed to initialize Thrift Deserializer", e);
        }
    });
    private final QueryScheduler _queryScheduler;
    private final ServerMetrics _serverMetrics;
    private final AccessControl _accessControl;
    private final Map<String, Future<byte[]>> _queryFuturesById;

    public InstanceRequestHandler(String str, PinotConfiguration pinotConfiguration, QueryScheduler queryScheduler, ServerMetrics serverMetrics, AccessControl accessControl) {
        this._instanceName = str;
        this._queryScheduler = queryScheduler;
        this._serverMetrics = serverMetrics;
        this._accessControl = accessControl;
        if (!Boolean.parseBoolean(pinotConfiguration.getProperty("pinot.server.enable.query.cancellation"))) {
            this._queryFuturesById = null;
        } else {
            this._queryFuturesById = new ConcurrentHashMap();
            LOGGER.info("Enable query cancellation");
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        super.userEventTriggered(channelHandlerContext, obj);
        if (!(obj instanceof SslHandshakeCompletionEvent) || this._accessControl.isAuthorizedChannel(channelHandlerContext)) {
            return;
        }
        channelHandlerContext.disconnect();
        LOGGER.error("Exception while processing instance request: Unauthorized access to pinot-server");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        long j = 0;
        TBase tBase = null;
        byte[] bArr = null;
        String str = null;
        try {
            int readableBytes = byteBuf.readableBytes();
            tBase = new InstanceRequest();
            bArr = new byte[readableBytes];
            j = System.currentTimeMillis();
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1L);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, readableBytes);
            byteBuf.readBytes(bArr);
            this._deserializer.get().deserialize(tBase, bArr);
            ServerQueryRequest serverQueryRequest = new ServerQueryRequest(tBase, this._serverMetrics, j);
            serverQueryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, j).stopAndRecord();
            str = serverQueryRequest.getTableNameWithType();
            submitQuery(serverQueryRequest, channelHandlerContext, str, j, tBase);
        } catch (Exception e) {
            if (e instanceof TException) {
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1L);
            }
            String hexString = bArr != null ? BytesUtils.toHexString(bArr) : "";
            long requestId = tBase != null ? tBase.getRequestId() : 0L;
            LOGGER.error("Exception while processing instance request: {}", hexString, e);
            sendErrorResponse(channelHandlerContext, requestId, str, j, DataTableBuilderFactory.getEmptyDataTable(), e);
        }
    }

    @VisibleForTesting
    void submitQuery(ServerQueryRequest serverQueryRequest, ChannelHandlerContext channelHandlerContext, String str, long j, InstanceRequest instanceRequest) {
        Future<byte[]> submit = this._queryScheduler.submit(serverQueryRequest);
        if (this._queryFuturesById != null) {
            String queryId = serverQueryRequest.getQueryId();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Keep track of running query: {}", queryId);
            }
            this._queryFuturesById.put(queryId, submit);
        }
        Futures.addCallback(submit, createCallback(channelHandlerContext, str, j, instanceRequest, serverQueryRequest), MoreExecutors.directExecutor());
    }

    private FutureCallback<byte[]> createCallback(final ChannelHandlerContext channelHandlerContext, final String str, final long j, final InstanceRequest instanceRequest, final ServerQueryRequest serverQueryRequest) {
        return new FutureCallback<byte[]>() { // from class: org.apache.pinot.core.transport.InstanceRequestHandler.1
            public void onSuccess(@Nullable byte[] bArr) {
                if (InstanceRequestHandler.this._queryFuturesById != null) {
                    String queryId = serverQueryRequest.getQueryId();
                    if (InstanceRequestHandler.LOGGER.isDebugEnabled()) {
                        InstanceRequestHandler.LOGGER.debug("Remove track of running query: {} on success", queryId);
                    }
                    InstanceRequestHandler.this._queryFuturesById.remove(queryId);
                }
                if (bArr != null) {
                    InstanceRequestHandler.this.sendResponse(channelHandlerContext, serverQueryRequest.getTableNameWithType(), j, bArr);
                } else {
                    InstanceRequestHandler.this.sendErrorResponse(channelHandlerContext, serverQueryRequest.getRequestId(), str, j, DataTableBuilderFactory.getEmptyDataTable(), new Exception("Null query response."));
                }
            }

            public void onFailure(Throwable th) {
                Exception exc;
                if (InstanceRequestHandler.this._queryFuturesById != null) {
                    String queryId = serverQueryRequest.getQueryId();
                    if (InstanceRequestHandler.LOGGER.isDebugEnabled()) {
                        InstanceRequestHandler.LOGGER.debug("Remove track of running query: {} on failure", queryId);
                    }
                    InstanceRequestHandler.this._queryFuturesById.remove(queryId);
                }
                if (th instanceof Exception) {
                    exc = (Exception) th;
                    if (exc instanceof CancellationException) {
                        InstanceRequestHandler.LOGGER.info("Query: {} got cancelled", serverQueryRequest.getQueryId());
                    } else {
                        InstanceRequestHandler.LOGGER.error("Exception while processing instance request", exc);
                    }
                } else {
                    InstanceRequestHandler.LOGGER.error("Error while processing instance request", th);
                    exc = new Exception(th);
                }
                InstanceRequestHandler.this.sendErrorResponse(channelHandlerContext, instanceRequest.getRequestId(), str, j, DataTableBuilderFactory.getEmptyDataTable(), exc);
            }
        };
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        String str = "Unhandled Exception in " + getClass().getCanonicalName();
        LOGGER.error(str, th);
        sendErrorResponse(channelHandlerContext, 0L, null, System.currentTimeMillis(), DataTableBuilderFactory.getEmptyDataTable(), new Exception(str, th));
    }

    public boolean cancelQuery(String str) {
        Preconditions.checkState(this._queryFuturesById != null, "Query cancellation is not enabled on server");
        Future<byte[]> future = this._queryFuturesById.get(str);
        if (future == null) {
            return false;
        }
        boolean isDone = future.isDone();
        if (!isDone) {
            future.cancel(true);
        }
        if (!LOGGER.isDebugEnabled()) {
            return true;
        }
        LOGGER.debug("Cancelled query: {} that's done: {}", str, Boolean.valueOf(isDone));
        return true;
    }

    public Set<String> getRunningQueryIds() {
        Preconditions.checkState(this._queryFuturesById != null, "Query cancellation is not enabled on server");
        return new HashSet(this._queryFuturesById.keySet());
    }

    private void sendErrorResponse(ChannelHandlerContext channelHandlerContext, long j, String str, long j2, DataTable dataTable, Exception exc) {
        boolean z = exc instanceof CancellationException;
        try {
            try {
                dataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), Long.toString(j));
                if (z) {
                    dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, "Query cancelled on: " + this._instanceName + " " + exc));
                } else {
                    dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, "Query execution error on: " + this._instanceName + " " + exc));
                }
                sendResponse(channelHandlerContext, str, j2, dataTable.toBytes());
                if (!z) {
                    LOGGER.error("Query processing error: ", exc);
                }
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            } catch (Exception e) {
                LOGGER.error("Exception while sending query processing error to Broker.", e);
                if (!z) {
                    LOGGER.error("Query processing error: ", exc);
                }
                this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            }
        } catch (Throwable th) {
            if (!z) {
                LOGGER.error("Query processing error: ", exc);
            }
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1L);
            throw th;
        }
    }

    private void sendResponse(ChannelHandlerContext channelHandlerContext, String str, long j, byte[] bArr) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = (int) (currentTimeMillis - j);
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(bArr)).addListener(future -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            int i2 = (int) (currentTimeMillis2 - currentTimeMillis);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1L);
            this._serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, bArr.length);
            this._serverMetrics.addTimedTableValue(str, ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, i2, TimeUnit.MILLISECONDS);
            int i3 = (int) (currentTimeMillis2 - j);
            if (i3 > 100) {
                LOGGER.info("Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
            }
            if (bArr.length > LARGE_RESPONSE_SIZE_THRESHOLD_BYTES) {
                LOGGER.warn("Large query: response size in bytes: {}, table name {}", Integer.valueOf(bArr.length), str);
                ServerMetrics.get().addMeteredTableValue(str, ServerMeter.LARGE_QUERY_RESPONSES_SENT, 1L);
            }
        });
    }
}
