package org.apache.pinot.query.service.dispatch.timeseries;

import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.runtime.timeseries.serde.TimeSeriesBlockSerde;
import org.apache.pinot.tsdb.spi.series.TimeSeriesBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/service/dispatch/timeseries/TimeSeriesDispatchObserver.class */
public class TimeSeriesDispatchObserver implements StreamObserver<Worker.TimeSeriesResponse> {
    public static final int MAX_QUEUE_CAPACITY = 4096;
    private static final Logger LOGGER = LoggerFactory.getLogger(TimeSeriesDispatchObserver.class);
    private final Map<String, BlockingQueue<Object>> _exchangeReceiversByPlanId;

    public TimeSeriesDispatchObserver(Map<String, BlockingQueue<Object>> map) {
        this._exchangeReceiversByPlanId = map;
    }

    public void onNext(Worker.TimeSeriesResponse timeSeriesResponse) {
        if (timeSeriesResponse.containsMetadata("errorType")) {
            onError(new Throwable(String.format("Error in server (type: %s): %s", timeSeriesResponse.getMetadataOrDefault("errorType", ""), timeSeriesResponse.getMetadataOrDefault("error", ""))));
            return;
        }
        String str = (String) timeSeriesResponse.getMetadataMap().get("planId");
        TimeSeriesBlock timeSeriesBlock = null;
        TimeSeriesBlock timeSeriesBlock2 = null;
        try {
            timeSeriesBlock = TimeSeriesBlockSerde.deserializeTimeSeriesBlock(timeSeriesResponse.getPayload().asReadOnlyByteBuffer());
        } catch (Throwable th) {
            timeSeriesBlock2 = th;
        }
        BlockingQueue<Object> blockingQueue = this._exchangeReceiversByPlanId.get(str);
        if (blockingQueue == null) {
            String format = String.format("Receiver is not initialized for planId: %s. Receivers exist only for planIds: %s", str, this._exchangeReceiversByPlanId.keySet());
            LOGGER.warn(format);
            onError(new IllegalStateException(format));
        } else {
            if (blockingQueue.offer(timeSeriesBlock2 != null ? timeSeriesBlock2 : timeSeriesBlock)) {
                return;
            }
            onError(new RuntimeException(String.format("Offer to receiver queue (capacity=%s) for planId: %s failed", Integer.valueOf(blockingQueue.remainingCapacity()), str)));
        }
    }

    public void onError(Throwable th) {
        Iterator<BlockingQueue<Object>> it = this._exchangeReceiversByPlanId.values().iterator();
        while (it.hasNext()) {
            it.next().offer(th);
        }
    }

    public void onCompleted() {
    }
}
