package org.apache.pinot.query.mailbox;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;

/* loaded from: input_file:org/apache/pinot/query/mailbox/GrpcSendingMailbox.class */
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
    private final GrpcMailboxService _mailboxService;
    private final String _mailboxId;
    private final AtomicBoolean _initialized = new AtomicBoolean(false);
    private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
    private MailboxStatusStreamObserver _statusStreamObserver;

    public GrpcSendingMailbox(String str, GrpcMailboxService grpcMailboxService) {
        this._mailboxService = grpcMailboxService;
        this._mailboxId = str;
        this._initialized.set(false);
    }

    public void init() throws UnsupportedOperationException {
        PinotMailboxGrpc.PinotMailboxStub newStub = PinotMailboxGrpc.newStub(this._mailboxService.getChannel(this._mailboxId));
        this._statusStreamObserver = new MailboxStatusStreamObserver();
        this._statusStreamObserver.init(newStub.open(this._statusStreamObserver));
        this._statusStreamObserver.send(Mailbox.MailboxContent.newBuilder().setMailboxId(this._mailboxId).putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY, "true").build());
        this._initialized.set(true);
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public void send(TransferableBlock transferableBlock) throws UnsupportedOperationException {
        if (!this._initialized.get()) {
            init();
        }
        this._statusStreamObserver.send(toMailboxContent(transferableBlock.getDataBlock()));
        this._totalMsgSent.incrementAndGet();
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public void complete() {
        this._statusStreamObserver.complete();
    }

    @Override // org.apache.pinot.query.mailbox.SendingMailbox
    public String getMailboxId() {
        return this._mailboxId;
    }

    private Mailbox.MailboxContent toMailboxContent(DataBlock dataBlock) {
        try {
            Mailbox.MailboxContent.Builder payload = Mailbox.MailboxContent.newBuilder().setMailboxId(this._mailboxId).setPayload(ByteString.copyFrom(dataBlock.toBytes()));
            if (dataBlock instanceof MetadataBlock) {
                payload.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY, "true");
            }
            return payload.build();
        } catch (IOException e) {
            throw new RuntimeException("Error converting to mailbox content", e);
        }
    }
}
