package org.apache.pinot.query.mailbox;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
import org.apache.pinot.core.common.datablock.MetadataBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.class */
public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase {
    @Test
    public void testHappyPath() throws Exception {
        Preconditions.checkState(this._mailboxServices.size() >= 2);
        Map.Entry<Integer, GrpcMailboxService> firstEntry = this._mailboxServices.firstEntry();
        Map.Entry<Integer, GrpcMailboxService> lastEntry = this._mailboxServices.lastEntry();
        String format = String.format("happyPath:localhost:%d:localhost:%d", firstEntry.getKey(), lastEntry.getKey());
        SendingMailbox sendingMailbox = firstEntry.getValue().getSendingMailbox(format);
        ReceivingMailbox receivingMailbox = lastEntry.getValue().getReceivingMailbox(format);
        Mailbox.MailboxContent testMailboxContent = getTestMailboxContent(format);
        sendingMailbox.send(testMailboxContent);
        TestUtils.waitForCondition(r3 -> {
            return Boolean.valueOf(receivingMailbox.isInitialized());
        }, 5000L, "Receiving mailbox initialize failed!");
        Assert.assertEquals((Mailbox.MailboxContent) receivingMailbox.receive(), testMailboxContent);
        sendingMailbox.complete();
        TestUtils.waitForCondition(r32 -> {
            return Boolean.valueOf(receivingMailbox.isClosed());
        }, 5000L, "Receiving mailbox is not closed properly!");
    }

    @Test
    public void testGrpcException() throws Exception {
        Preconditions.checkState(this._mailboxServices.size() >= 2);
        Map.Entry<Integer, GrpcMailboxService> firstEntry = this._mailboxServices.firstEntry();
        Map.Entry<Integer, GrpcMailboxService> lastEntry = this._mailboxServices.lastEntry();
        String format = String.format("exception:localhost:%d:localhost:%d", firstEntry.getKey(), lastEntry.getKey());
        SendingMailbox sendingMailbox = firstEntry.getValue().getSendingMailbox(format);
        ReceivingMailbox receivingMailbox = lastEntry.getValue().getReceivingMailbox(format);
        sendingMailbox.send(getTooLargeMailboxContent(format));
        TestUtils.waitForCondition(r3 -> {
            return Boolean.valueOf(receivingMailbox.isInitialized());
        }, 5000L, "Receiving mailbox initialize failed!");
        Mailbox.MailboxContent mailboxContent = (Mailbox.MailboxContent) receivingMailbox.receive();
        Assert.assertNotNull(mailboxContent);
        ByteBuffer asReadOnlyByteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
        Assert.assertTrue(asReadOnlyByteBuffer.hasRemaining());
        BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(asReadOnlyByteBuffer);
        Assert.assertTrue((dataBlock instanceof MetadataBlock) && !dataBlock.getExceptions().isEmpty());
    }

    private Mailbox.MailboxContent getTestMailboxContent(String str) throws IOException {
        return Mailbox.MailboxContent.newBuilder().setMailboxId(str).putAllMetadata(ImmutableMap.of("key", "value", "end.of.stream", "true")).setPayload(ByteString.copyFrom(new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(new DataSchema(new String[]{"foo", "bar"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}))).toBytes())).build();
    }

    private Mailbox.MailboxContent getTooLargeMailboxContent(String str) throws IOException {
        return Mailbox.MailboxContent.newBuilder().setMailboxId(str).putAllMetadata(ImmutableMap.of("key", "value", "end.of.stream", "true")).setPayload(ByteString.copyFrom(new byte[16000000])).build();
    }
}
