package org.apache.pinot.connector.spark.common.reader;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import java.io.Closeable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.connector.spark.common.Logging;
import org.apache.pinot.connector.spark.common.partition.PinotSplit;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: PinotGrpcServerDataFetcher.scala */
@ScalaSignature(bytes = "\u0006\u0001Q4Q!\u0004\b\u0001\u001dqA\u0001b\f\u0001\u0003\u0002\u0003\u0006I!\r\u0005\u0006o\u0001!\t\u0001\u000f\u0005\by\u0001\u0011\r\u0011\"\u0003>\u0011\u0019)\u0005\u0001)A\u0005}!9a\t\u0001b\u0001\n\u00139\u0005BB*\u0001A\u0003%\u0001\nC\u0003U\u0001\u0011\u0005Q\u000bC\u0003i\u0001\u0011\u0005\u0011nB\u0003n\u001d!\u0005aNB\u0003\u000e\u001d!\u0005q\u000eC\u00038\u0015\u0011\u0005\u0001\u000fC\u0003r\u0015\u0011\u0005!O\u0001\u000eQS:|Go\u0012:qGN+'O^3s\t\u0006$\u0018MR3uG\",'O\u0003\u0002\u0010!\u00051!/Z1eKJT!!\u0005\n\u0002\r\r|W.\\8o\u0015\t\u0019B#A\u0003ta\u0006\u00148N\u0003\u0002\u0016-\u0005I1m\u001c8oK\u000e$xN\u001d\u0006\u0003/a\tQ\u0001]5o_RT!!\u0007\u000e\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0012aA8sON!\u0001!H\u0012(!\tq\u0012%D\u0001 \u0015\u0005\u0001\u0013!B:dC2\f\u0017B\u0001\u0012 \u0005\u0019\te.\u001f*fMB\u0011A%J\u0007\u0002!%\u0011a\u0005\u0005\u0002\b\u0019><w-\u001b8h!\tAS&D\u0001*\u0015\tQ3&\u0001\u0002j_*\tA&\u0001\u0003kCZ\f\u0017B\u0001\u0018*\u0005%\u0019En\\:fC\ndW-\u0001\u0006qS:|Go\u00159mSR\u001c\u0001\u0001\u0005\u00023k5\t1G\u0003\u00025!\u0005I\u0001/\u0019:uSRLwN\\\u0005\u0003mM\u0012!\u0002U5o_R\u001c\u0006\u000f\\5u\u0003\u0019a\u0014N\\5u}Q\u0011\u0011h\u000f\t\u0003u\u0001i\u0011A\u0004\u0005\u0006_\t\u0001\r!M\u0001\bG\"\fgN\\3m+\u0005q\u0004CA D\u001b\u0005\u0001%BA!C\u0003\u00119'\u000f]2\u000b\u0003)J!\u0001\u0012!\u0003\u001d5\u000bg.Y4fI\u000eC\u0017M\u001c8fY\u0006A1\r[1o]\u0016d\u0007%A\fqS:|GoU3sm\u0016\u0014(\t\\8dW&twm\u0015;vEV\t\u0001\n\u0005\u0002J!:\u0011!JT\u0007\u0002\u0017*\u0011A*T\u0001\u0006aJ|Go\u001c\u0006\u0003#YI!aT&\u0002)AKgn\u001c;Rk\u0016\u0014\u0018pU3sm\u0016\u0014xI\u001d9d\u0013\t\t&K\u0001\u000fQS:|G/U;fef\u001cVM\u001d<fe\ncwnY6j]\u001e\u001cF/\u001e2\u000b\u0005=[\u0015\u0001\u00079j]>$8+\u001a:wKJ\u0014En\\2lS:<7\u000b^;cA\u0005Ia-\u001a;dQ\u0012\u000bG/\u0019\u000b\u0002-B\u0019qk\u00182\u000f\u0005akfBA-]\u001b\u0005Q&BA.1\u0003\u0019a$o\\8u}%\t\u0001%\u0003\u0002_?\u00059\u0001/Y2lC\u001e,\u0017B\u00011b\u0005!IE/\u001a:bi>\u0014(B\u00010 !\t\u0019g-D\u0001e\u0015\t)W*A\u0005eCR\fG/\u00192mK&\u0011q\r\u001a\u0002\n\t\u0006$\u0018\rV1cY\u0016\fQa\u00197pg\u0016$\u0012A\u001b\t\u0003=-L!\u0001\\\u0010\u0003\tUs\u0017\u000e^\u0001\u001b!&tw\u000e^$sa\u000e\u001cVM\u001d<fe\u0012\u000bG/\u0019$fi\u000eDWM\u001d\t\u0003u)\u0019\"AC\u000f\u0015\u00039\fQ!\u00199qYf$\"!O:\t\u000b=b\u0001\u0019A\u0019")
/* loaded from: input_file:org/apache/pinot/connector/spark/common/reader/PinotGrpcServerDataFetcher.class */
public class PinotGrpcServerDataFetcher implements Logging, Closeable {
    private final PinotSplit pinotSplit;
    private final ManagedChannel channel;
    private final PinotQueryServerGrpc.PinotQueryServerBlockingStub pinotServerBlockingStub;
    private transient Logger org$apache$pinot$connector$spark$common$Logging$$log_;

    public static PinotGrpcServerDataFetcher apply(PinotSplit pinotSplit) {
        return PinotGrpcServerDataFetcher$.MODULE$.apply(pinotSplit);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public String logName() {
        String logName;
        logName = logName();
        return logName;
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public Logger log() {
        Logger log;
        log = log();
        return log;
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public Logger org$apache$pinot$connector$spark$common$Logging$$log_() {
        return this.org$apache$pinot$connector$spark$common$Logging$$log_;
    }

    @Override // org.apache.pinot.connector.spark.common.Logging
    public void org$apache$pinot$connector$spark$common$Logging$$log__$eq(Logger logger) {
        this.org$apache$pinot$connector$spark$common$Logging$$log_ = logger;
    }

    private ManagedChannel channel() {
        return this.channel;
    }

    private PinotQueryServerGrpc.PinotQueryServerBlockingStub pinotServerBlockingStub() {
        return this.pinotServerBlockingStub;
    }

    public Iterator<DataTable> fetchData() {
        String realtimeSelectQuery;
        Server.ServerRequest.Builder addAllSegments = Server.ServerRequest.newBuilder().putMetadata(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, "true").addAllSegments((Iterable) JavaConverters$.MODULE$.seqAsJavaListConverter(this.pinotSplit.serverAndSegments().segments()).asJava());
        TableType serverType = this.pinotSplit.serverAndSegments().serverType();
        if (TableType.OFFLINE.equals(serverType)) {
            realtimeSelectQuery = this.pinotSplit.query().offlineSelectQuery();
        } else {
            if (!TableType.REALTIME.equals(serverType)) {
                throw new MatchError(serverType);
            }
            realtimeSelectQuery = this.pinotSplit.query().realtimeSelectQuery();
        }
        try {
            return ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(pinotServerBlockingStub().submit(addAllSegments.setSql(realtimeSelectQuery).build())).asScala()).withFilter(serverResponse -> {
                return BoxesRunTime.boxToBoolean($anonfun$fetchData$1(serverResponse));
            }).map(serverResponse2 -> {
                return DataTableFactory.getDataTable(serverResponse2.getPayload().toByteArray());
            }).filter(dataTable -> {
                return BoxesRunTime.boxToBoolean($anonfun$fetchData$3(dataTable));
            });
        } catch (StatusRuntimeException e) {
            logError(() -> {
                return new StringBuilder(43).append("Caught exception when reading data from ").append(this.pinotSplit.serverAndSegments().serverHost()).append(":").append(this.pinotSplit.serverAndSegments().serverGrpcPort()).append(": ").append(e).toString();
            });
            throw e;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (channel().isShutdown()) {
            return;
        }
        channel().shutdown();
        logInfo(() -> {
            return "Pinot server connection closed";
        });
    }

    public static final /* synthetic */ boolean $anonfun$fetchData$1(Server.ServerResponse serverResponse) {
        String str = serverResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE);
        return str != null ? str.equals(CommonConstants.Query.Response.ResponseType.DATA) : CommonConstants.Query.Response.ResponseType.DATA == 0;
    }

    public static final /* synthetic */ boolean $anonfun$fetchData$3(DataTable dataTable) {
        return dataTable.getNumberOfRows() > 0;
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [io.grpc.ManagedChannelBuilder] */
    public PinotGrpcServerDataFetcher(PinotSplit pinotSplit) {
        this.pinotSplit = pinotSplit;
        Logging.$init$(this);
        this.channel = ManagedChannelBuilder.forAddress(pinotSplit.serverAndSegments().serverHost(), pinotSplit.serverAndSegments().serverGrpcPort()).usePlaintext().maxInboundMessageSize(Integer.MAX_VALUE).build();
        this.pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel());
    }
}
