package org.apache.pinot.query.mailbox;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.class */
public class GrpcMailboxServiceTest {
    private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
    private final AtomicReference<Consumer<MailboxIdentifier>> _mail1GotData = new AtomicReference<>(mailboxIdentifier -> {
    });
    private final AtomicReference<Consumer<MailboxIdentifier>> _mail2GotData = new AtomicReference<>(mailboxIdentifier -> {
    });
    private GrpcMailboxService _mailboxService1;
    private GrpcMailboxService _mailboxService2;

    @BeforeClass
    public void setUp() throws Exception {
        PinotConfiguration pinotConfiguration = new PinotConfiguration(Collections.singletonMap("pinot.query.runner.max.msg.size.bytes", 4000000));
        this._mailboxService1 = new GrpcMailboxService("localhost", QueryTestUtils.getAvailablePort(), pinotConfiguration, mailboxIdentifier -> {
            this._mail1GotData.get().accept(mailboxIdentifier);
        });
        this._mailboxService1.start();
        this._mailboxService2 = new GrpcMailboxService("localhost", QueryTestUtils.getAvailablePort(), pinotConfiguration, mailboxIdentifier2 -> {
            this._mail2GotData.get().accept(mailboxIdentifier2);
        });
        this._mailboxService2.start();
    }

    @AfterClass
    public void tearDown() {
        this._mailboxService1.shutdown();
        this._mailboxService2.shutdown();
    }

    @Test(timeOut = 10000)
    public void testHappyPath() throws Exception {
        StringMailboxIdentifier stringMailboxIdentifier = new StringMailboxIdentifier("happypath", "localhost", this._mailboxService1.getMailboxPort(), "localhost", this._mailboxService2.getMailboxPort());
        SendingMailbox sendingMailbox = this._mailboxService1.getSendingMailbox(stringMailboxIdentifier);
        ReceivingMailbox receivingMailbox = this._mailboxService2.getReceivingMailbox(stringMailboxIdentifier);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this._mail2GotData.set(mailboxIdentifier -> {
            countDownLatch.countDown();
        });
        TransferableBlock testTransferableBlock = getTestTransferableBlock();
        sendingMailbox.send(testTransferableBlock);
        countDownLatch.await();
        Assert.assertEquals(((TransferableBlock) receivingMailbox.receive()).getDataBlock().toBytes(), testTransferableBlock.getDataBlock().toBytes());
        sendingMailbox.complete();
        TestUtils.waitForCondition(r3 -> {
            return Boolean.valueOf(receivingMailbox.isClosed());
        }, 5000L, "Receiving mailbox is not closed properly!");
    }

    @Test(timeOut = 10000)
    public void testGrpcException() throws Exception {
        StringMailboxIdentifier stringMailboxIdentifier = new StringMailboxIdentifier("exception", "localhost", this._mailboxService1.getMailboxPort(), "localhost", this._mailboxService2.getMailboxPort());
        SendingMailbox sendingMailbox = this._mailboxService1.getSendingMailbox(stringMailboxIdentifier);
        ReceivingMailbox receivingMailbox = this._mailboxService2.getReceivingMailbox(stringMailboxIdentifier);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this._mail2GotData.set(mailboxIdentifier -> {
            countDownLatch.countDown();
        });
        sendingMailbox.send(getTooLargeTransferableBlock());
        countDownLatch.await();
        TransferableBlock transferableBlock = (TransferableBlock) receivingMailbox.receive();
        Assert.assertNotNull(transferableBlock);
        DataBlock dataBlock = transferableBlock.getDataBlock();
        Assert.assertTrue(dataBlock instanceof MetadataBlock);
        Assert.assertFalse(dataBlock.getExceptions().isEmpty());
    }

    private TransferableBlock getTestTransferableBlock() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createRow(0, "test_string"));
        return new TransferableBlock(arrayList, TEST_DATA_SCHEMA, DataBlock.Type.ROW);
    }

    private TransferableBlock getTooLargeTransferableBlock() {
        ArrayList arrayList = new ArrayList(1000000);
        for (int i = 0; i < 1000000; i++) {
            arrayList.add(createRow(0, "test_string"));
        }
        return new TransferableBlock(arrayList, TEST_DATA_SCHEMA, DataBlock.Type.ROW);
    }

    private Object[] createRow(int i, String str) {
        return new Object[]{Integer.valueOf(i), str};
    }
}
