package org.apache.pinot.core.data.manager.realtime;

import com.google.common.cache.LoadingCache;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager;
import org.apache.pinot.spi.stream.StreamConfig;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.class */
public class RealtimeConsumptionRateManagerTest {
    private static final int NUM_PARTITIONS_TOPIC_A = 10;
    private static final int NUM_PARTITIONS_TOPIC_B = 20;
    private static final String TABLE_NAME = "table-XYZ";
    private static final double DELTA = 1.0E-4d;
    private static RealtimeConsumptionRateManager _consumptionRateManager;
    private static final Double RATE_LIMIT_FOR_ENTIRE_TOPIC = Double.valueOf(50.0d);
    private static final StreamConfig STREAM_CONFIG_A = (StreamConfig) Mockito.mock(StreamConfig.class);
    private static final StreamConfig STREAM_CONFIG_B = (StreamConfig) Mockito.mock(StreamConfig.class);
    private static final StreamConfig STREAM_CONFIG_C = (StreamConfig) Mockito.mock(StreamConfig.class);

    @Test
    public void testCreateRateLimiter() {
        Assert.assertEquals(5.0d, _consumptionRateManager.createRateLimiter(STREAM_CONFIG_A, TABLE_NAME).getRate(), DELTA);
        Assert.assertEquals(2.5d, _consumptionRateManager.createRateLimiter(STREAM_CONFIG_B, TABLE_NAME).getRate(), DELTA);
        Assert.assertEquals(_consumptionRateManager.createRateLimiter(STREAM_CONFIG_C, TABLE_NAME), RealtimeConsumptionRateManager.NOOP_RATE_LIMITER);
    }

    @Test
    public void testBuildCache() throws Exception {
        RealtimeConsumptionRateManager.PartitionCountFetcher partitionCountFetcher = (RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.mock(RealtimeConsumptionRateManager.PartitionCountFetcher.class);
        LoadingCache buildCache = RealtimeConsumptionRateManager.buildCache(partitionCountFetcher, 500L, TimeUnit.MILLISECONDS);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(Integer.valueOf(NUM_PARTITIONS_TOPIC_A));
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(Integer.valueOf(NUM_PARTITIONS_TOPIC_B));
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), NUM_PARTITIONS_TOPIC_A);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), NUM_PARTITIONS_TOPIC_A);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), NUM_PARTITIONS_TOPIC_A);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), NUM_PARTITIONS_TOPIC_B);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), NUM_PARTITIONS_TOPIC_B);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(1))).fetch(STREAM_CONFIG_A);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(1))).fetch(STREAM_CONFIG_B);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(11);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(21);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), NUM_PARTITIONS_TOPIC_A);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), NUM_PARTITIONS_TOPIC_B);
        Thread.sleep(550L);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), 21);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), 21);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(2))).fetch(STREAM_CONFIG_A);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(2))).fetch(STREAM_CONFIG_B);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn((Object) null);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_B)).thenReturn(22);
        Thread.sleep(550L);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_A)).intValue(), 11);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), 22);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_B)).intValue(), 22);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(3))).fetch(STREAM_CONFIG_A);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(3))).fetch(STREAM_CONFIG_B);
        Mockito.when(partitionCountFetcher.fetch(STREAM_CONFIG_C)).thenReturn((Object) null);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_C)).intValue(), 1);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_C)).intValue(), 1);
        Assert.assertEquals(((Integer) buildCache.get(STREAM_CONFIG_C)).intValue(), 1);
        ((RealtimeConsumptionRateManager.PartitionCountFetcher) Mockito.verify(partitionCountFetcher, Mockito.times(1))).fetch(STREAM_CONFIG_C);
    }

    @Test
    public void testMetricEmitter() {
        double d = 2.0d * 60.0d;
        RealtimeConsumptionRateManager.MetricEmitter metricEmitter = new RealtimeConsumptionRateManager.MetricEmitter((ServerMetrics) Mockito.mock(ServerMetrics.class), "tableA-topicB-partition5");
        int[] iArr = {NUM_PARTITIONS_TOPIC_A, NUM_PARTITIONS_TOPIC_B, 5, 25};
        Assert.assertEquals(metricEmitter.emitMetric(iArr[0], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:00:02Z"), ZoneOffset.UTC).instant()), 0);
        Assert.assertEquals(metricEmitter.emitMetric(iArr[1], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:00:10Z"), ZoneOffset.UTC).instant()), 0);
        Assert.assertEquals(metricEmitter.emitMetric(iArr[2], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:00:30Z"), ZoneOffset.UTC).instant()), 0);
        Assert.assertEquals(metricEmitter.emitMetric(iArr[3], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:00:55Z"), ZoneOffset.UTC).instant()), 0);
        Instant instant = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), ZoneOffset.UTC).instant();
        int calcExpectedRatio = calcExpectedRatio(d, sum(iArr));
        int[] iArr2 = {35};
        Assert.assertEquals(metricEmitter.emitMetric(iArr2[0], 2.0d, instant), calcExpectedRatio);
        Instant instant2 = Clock.fixed(Instant.parse("2022-08-10T12:02:25Z"), ZoneOffset.UTC).instant();
        int calcExpectedRatio2 = calcExpectedRatio(d, sum(iArr2));
        int[] iArr3 = {0};
        Assert.assertEquals(metricEmitter.emitMetric(iArr3[0], 2.0d, instant2), calcExpectedRatio2);
        Instant instant3 = Clock.fixed(Instant.parse("2022-08-10T12:03:15Z"), ZoneOffset.UTC).instant();
        int calcExpectedRatio3 = calcExpectedRatio(d, sum(iArr3));
        int[] iArr4 = {NUM_PARTITIONS_TOPIC_A, NUM_PARTITIONS_TOPIC_B};
        Assert.assertEquals(metricEmitter.emitMetric(iArr4[0], 2.0d, instant3), calcExpectedRatio3);
        Assert.assertEquals(metricEmitter.emitMetric(iArr4[1], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:03:20Z"), ZoneOffset.UTC).instant()), calcExpectedRatio3);
        Assert.assertEquals(metricEmitter.emitMetric(new int[]{5}[0], 2.0d, Clock.fixed(Instant.parse("2022-08-10T12:04:30Z"), ZoneOffset.UTC).instant()), calcExpectedRatio(d, sum(iArr4)));
    }

    private int calcExpectedRatio(double d, int i) {
        return (int) Math.round((i / d) * 100.0d);
    }

    private int sum(int[] iArr) {
        return Arrays.stream(iArr).sum();
    }

    static {
        LoadingCache loadingCache = (LoadingCache) Mockito.mock(LoadingCache.class);
        try {
            Mockito.when((Integer) loadingCache.get(STREAM_CONFIG_A)).thenReturn(Integer.valueOf(NUM_PARTITIONS_TOPIC_A));
            Mockito.when((Integer) loadingCache.get(STREAM_CONFIG_B)).thenReturn(Integer.valueOf(NUM_PARTITIONS_TOPIC_B));
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        Mockito.when(STREAM_CONFIG_A.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
        Mockito.when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
        Mockito.when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
        _consumptionRateManager = new RealtimeConsumptionRateManager(loadingCache);
    }
}
