package org.apache.pinot.core.query.reduce;

import com.google.common.collect.ImmutableMap;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.core.query.reduce.BaseReduceService;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.class */
public class StreamingReduceServiceTest {
    @Test
    public void testThreadExceptionTransfer() {
        Iterator it2 = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(Boolean.valueOf(it2.hasNext())).thenReturn(true);
        String str = "Some exception";
        Mockito.when((Server.ServerResponse) it2.next()).thenThrow(new Throwable[]{new RuntimeException("Some exception")});
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        ServerRoutingInstance serverRoutingInstance = new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE);
        Assert.assertTrue(verifyException(() -> {
            StreamingReduceService.processIterativeServerResponse((StreamingReducer) Mockito.mock(StreamingReducer.class), newFixedThreadPool, ImmutableMap.of(serverRoutingInstance, it2), 1000L, (BaseReduceService.ExecutionStatsAggregator) Mockito.mock(BaseReduceService.ExecutionStatsAggregator.class));
            return null;
        }, th -> {
            return th.getMessage().contains(str);
        }));
    }

    @Test
    public void testExecutionTimeout() throws Exception {
        Iterator it2 = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(Boolean.valueOf(it2.hasNext())).thenReturn(true);
        Mockito.when((Server.ServerResponse) it2.next()).then(new Answer<Void>() { // from class: org.apache.pinot.core.query.reduce.StreamingReduceServiceTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m13437answer(InvocationOnMock invocationOnMock) throws Throwable {
                Thread.sleep(1000L);
                return null;
            }
        });
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        ServerRoutingInstance serverRoutingInstance = new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE);
        Assert.assertTrue(verifyException(() -> {
            StreamingReduceService.processIterativeServerResponse((StreamingReducer) Mockito.mock(StreamingReducer.class), newFixedThreadPool, ImmutableMap.of(serverRoutingInstance, it2), 10L, (BaseReduceService.ExecutionStatsAggregator) Mockito.mock(BaseReduceService.ExecutionStatsAggregator.class));
            return null;
        }, th -> {
            return th instanceof TimeoutException;
        }));
    }

    private static boolean verifyException(Callable<Void> callable, Predicate<Throwable> predicate) {
        boolean z = false;
        if (callable == null || predicate == null) {
            throw new RuntimeException("verifyException method needs two non-null lambdas");
        }
        try {
            callable.call();
        } catch (Exception e) {
            Throwable th = e;
            while (true) {
                Throwable th2 = th;
                if (th2 == null || th2.getCause() == th2 || z) {
                    break;
                }
                z = predicate.test(th2);
                th = th2.getCause();
            }
        }
        return z;
    }
}
