package org.apache.pinot.controller.api;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.sun.net.httpserver.HttpHandler;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.utils.FakeHttpServer;
import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.class */
public class ConsumingSegmentInfoReaderStatelessTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReaderStatelessTest.class);
    private static final String TABLE_NAME = "myTable_REALTIME";
    private static final String SEGMENT_NAME_PARTITION_0 = "table__0__29__12345";
    private static final String SEGMENT_NAME_PARTITION_1 = "table__1__32__12345";
    private static final int TIMEOUT_MSEC = 10000;
    private static final int EXTENDED_TIMEOUT_FACTOR = 100;
    private PinotHelixResourceManager _helix;
    private final Executor _executor = Executors.newFixedThreadPool(1);
    private final HttpConnectionManager _connectionManager = new MultiThreadedHttpConnectionManager();
    private final Map<String, FakeConsumingInfoServer> _serverMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest$FakeConsumingInfoServer.class */
    public static class FakeConsumingInfoServer extends FakeHttpServer {
        List<SegmentConsumerInfo> _consumerInfos;

        FakeConsumingInfoServer(List<SegmentConsumerInfo> list) {
            this._consumerInfos = list;
        }
    }

    @BeforeClass
    public void setUp() throws IOException {
        this._helix = (PinotHelixResourceManager) Mockito.mock(PinotHelixResourceManager.class);
        HashMap hashMap = new HashMap();
        hashMap.put("0", "150");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("1", "150");
        FakeConsumingInfoServer fakeConsumingInfoServer = new FakeConsumingInfoServer(Lists.newArrayList(new SegmentConsumerInfo[]{new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0L, hashMap), new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0L, hashMap2)}));
        fakeConsumingInfoServer.start("/tables/", createHandler(200, fakeConsumingInfoServer._consumerInfos, 0));
        this._serverMap.put("server0", fakeConsumingInfoServer);
        FakeConsumingInfoServer fakeConsumingInfoServer2 = new FakeConsumingInfoServer(Lists.newArrayList(new SegmentConsumerInfo[]{new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0L, hashMap), new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0L, hashMap2)}));
        fakeConsumingInfoServer2.start("/tables/", createHandler(200, fakeConsumingInfoServer2._consumerInfos, 0));
        this._serverMap.put("server1", fakeConsumingInfoServer2);
        FakeConsumingInfoServer fakeConsumingInfoServer3 = new FakeConsumingInfoServer(Lists.newArrayList(new SegmentConsumerInfo[]{new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "NOT_CONSUMING", 0L, hashMap), new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0L, hashMap2)}));
        fakeConsumingInfoServer3.start("/tables/", createHandler(200, fakeConsumingInfoServer3._consumerInfos, 0));
        this._serverMap.put("server2", fakeConsumingInfoServer3);
        FakeConsumingInfoServer fakeConsumingInfoServer4 = new FakeConsumingInfoServer(Lists.newArrayList(new SegmentConsumerInfo[]{new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0L, hashMap2)}));
        fakeConsumingInfoServer4.start("/tables/", createHandler(200, fakeConsumingInfoServer4._consumerInfos, 0));
        this._serverMap.put("server3", fakeConsumingInfoServer4);
        FakeConsumingInfoServer fakeConsumingInfoServer5 = new FakeConsumingInfoServer(Lists.newArrayList(new SegmentConsumerInfo[]{new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0L, hashMap), new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0L, hashMap2)}));
        fakeConsumingInfoServer5.start("/tables/", createHandler(200, fakeConsumingInfoServer5._consumerInfos, 1000000));
        this._serverMap.put("server4", fakeConsumingInfoServer5);
    }

    @AfterClass
    public void tearDown() {
        Iterator<Map.Entry<String, FakeConsumingInfoServer>> it = this._serverMap.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().stop();
        }
    }

    private HttpHandler createHandler(int i, List<SegmentConsumerInfo> list, int i2) {
        return httpExchange -> {
            if (i2 > 0) {
                try {
                    Thread.sleep(i2);
                } catch (InterruptedException e) {
                    LOGGER.info("Handler interrupted during sleep");
                }
            }
            String objectToString = JsonUtils.objectToString(list);
            httpExchange.sendResponseHeaders(i, objectToString.length());
            OutputStream responseBody = httpExchange.getResponseBody();
            responseBody.write(objectToString.getBytes());
            responseBody.close();
        };
    }

    private Map<String, List<String>> subsetOfServerSegments(String... strArr) {
        HashMap hashMap = new HashMap();
        for (String str : strArr) {
            hashMap.put(str, (List) this._serverMap.get(str)._consumerInfos.stream().map((v0) -> {
                return v0.getSegmentName();
            }).collect(Collectors.toList()));
        }
        return hashMap;
    }

    private BiMap<String, String> serverEndpoints(String... strArr) {
        HashBiMap create = HashBiMap.create(strArr.length);
        for (String str : strArr) {
            create.put(str, this._serverMap.get(str)._endpoint);
        }
        return create;
    }

    private void mockSetup(String[] strArr, Set<String> set) throws InvalidConfigException {
        Mockito.when(this._helix.getServerToSegmentsMap(Matchers.anyString())).thenAnswer(invocationOnMock -> {
            return subsetOfServerSegments(strArr);
        });
        Mockito.when(this._helix.getDataInstanceAdminEndpoints(ArgumentMatchers.anySet())).thenAnswer(invocationOnMock2 -> {
            return serverEndpoints(strArr);
        });
        Mockito.when(this._helix.getConsumingSegments(Matchers.anyString())).thenAnswer(invocationOnMock3 -> {
            return set;
        });
        Mockito.when(this._helix.getServersForSegment(Matchers.anyString(), Matchers.anyString())).thenAnswer(invocationOnMock4 -> {
            return new HashSet(Arrays.asList(strArr));
        });
    }

    private ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner(String[] strArr, Set<String> set, String str) throws InvalidConfigException {
        mockSetup(strArr, set);
        return new ConsumingSegmentInfoReader(this._executor, this._connectionManager, this._helix).getConsumingSegmentsInfo(str, TIMEOUT_MSEC);
    }

    private TableStatus.IngestionStatus testRunnerIngestionStatus(String[] strArr, Set<String> set, String str) throws InvalidConfigException {
        mockSetup(strArr, set);
        return new ConsumingSegmentInfoReader(this._executor, this._connectionManager, this._helix).getIngestionStatus(str, TIMEOUT_MSEC);
    }

    private void checkIngestionStatus(String[] strArr, Set<String> set, TableStatus.IngestionState ingestionState) throws InvalidConfigException {
        Assert.assertEquals(testRunnerIngestionStatus(strArr, set, TABLE_NAME).getIngestionState(), ingestionState);
    }

    @Test
    public void testEmptyTable() throws InvalidConfigException {
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(new String[0], Collections.emptySet(), TABLE_NAME);
        checkIngestionStatus(new String[0], Collections.emptySet(), TableStatus.IngestionState.HEALTHY);
        Assert.assertTrue(testRunner._segmentToConsumingInfoMap.isEmpty());
    }

    @Test
    public void testHappyPath() throws InvalidConfigException {
        String[] strArr = {"server0", "server1"};
        HashSet newHashSet = Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1});
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(strArr, newHashSet, TABLE_NAME);
        checkIngestionStatus(strArr, newHashSet, TableStatus.IngestionState.HEALTHY);
        List list = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
        Assert.assertEquals(list.size(), 2);
        Iterator it = list.iterator();
        while (it.hasNext()) {
            checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) it.next(), Sets.newHashSet(new String[]{"server0", "server1"}), CommonConstants.ConsumerState.CONSUMING.toString(), "0", "150");
        }
        List list2 = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
        Assert.assertEquals(list2.size(), 2);
        Iterator it2 = list2.iterator();
        while (it2.hasNext()) {
            checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) it2.next(), Sets.newHashSet(new String[]{"server0", "server1"}), CommonConstants.ConsumerState.CONSUMING.toString(), "1", "150");
        }
    }

    @Test
    public void testNotConsumingState() throws InvalidConfigException {
        String[] strArr = {"server0", "server2"};
        HashSet newHashSet = Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1});
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(strArr, newHashSet, TABLE_NAME);
        checkIngestionStatus(strArr, newHashSet, TableStatus.IngestionState.UNHEALTHY);
        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> list = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
        Assert.assertEquals(list.size(), 2);
        for (ConsumingSegmentInfoReader.ConsumingSegmentInfo consumingSegmentInfo : list) {
            if (consumingSegmentInfo._serverName.equals("server0")) {
                checkConsumingSegmentInfo(consumingSegmentInfo, Sets.newHashSet(new String[]{"server0"}), CommonConstants.ConsumerState.CONSUMING.toString(), "0", "150");
            } else {
                checkConsumingSegmentInfo(consumingSegmentInfo, Sets.newHashSet(new String[]{"server2"}), CommonConstants.ConsumerState.NOT_CONSUMING.toString(), "0", "150");
            }
        }
        List list2 = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
        Assert.assertEquals(list2.size(), 2);
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) it.next(), Sets.newHashSet(new String[]{"server0", "server2"}), CommonConstants.ConsumerState.CONSUMING.toString(), "1", "150");
        }
    }

    @Test
    public void testNoConsumerButConsumingInIdealState() throws InvalidConfigException {
        String[] strArr = {"server3"};
        HashSet newHashSet = Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1});
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(strArr, newHashSet, TABLE_NAME);
        checkIngestionStatus(strArr, newHashSet, TableStatus.IngestionState.UNHEALTHY);
        Assert.assertTrue(((List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0)).isEmpty());
        List list = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
        Assert.assertEquals(list.size(), 1);
        checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) list.get(0), Sets.newHashSet(new String[]{"server3"}), CommonConstants.ConsumerState.CONSUMING.toString(), "1", "150");
    }

    @Test
    public void testNoConsumerOfflineInIdealState() throws InvalidConfigException {
        String[] strArr = {"server3"};
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(strArr, Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_1}), TABLE_NAME);
        checkIngestionStatus(strArr, Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1}), TableStatus.IngestionState.UNHEALTHY);
        Assert.assertNull((List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0));
        List list = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
        Assert.assertEquals(list.size(), 1);
        checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) list.get(0), Sets.newHashSet(new String[]{"server3"}), CommonConstants.ConsumerState.CONSUMING.toString(), "1", "150");
    }

    @Test
    public void testErrorFromServer() throws InvalidConfigException {
        String[] strArr = {"server0", "server4"};
        HashSet newHashSet = Sets.newHashSet(new String[]{SEGMENT_NAME_PARTITION_0, SEGMENT_NAME_PARTITION_1});
        ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap testRunner = testRunner(strArr, newHashSet, TABLE_NAME);
        checkIngestionStatus(strArr, newHashSet, TableStatus.IngestionState.UNHEALTHY);
        List list = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_0);
        Assert.assertEquals(list.size(), 1);
        checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) list.get(0), Sets.newHashSet(new String[]{"server0"}), CommonConstants.ConsumerState.CONSUMING.toString(), "0", "150");
        List list2 = (List) testRunner._segmentToConsumingInfoMap.get(SEGMENT_NAME_PARTITION_1);
        Assert.assertEquals(list2.size(), 1);
        checkConsumingSegmentInfo((ConsumingSegmentInfoReader.ConsumingSegmentInfo) list2.get(0), Sets.newHashSet(new String[]{"server0"}), CommonConstants.ConsumerState.CONSUMING.toString(), "1", "150");
    }

    private void checkConsumingSegmentInfo(ConsumingSegmentInfoReader.ConsumingSegmentInfo consumingSegmentInfo, Set<String> set, String str, String str2, String str3) {
        Assert.assertTrue(set.contains(consumingSegmentInfo._serverName));
        Assert.assertEquals(consumingSegmentInfo._consumerState, str);
        Assert.assertEquals((String) consumingSegmentInfo._partitionToOffsetMap.get(str2), str3);
    }
}
