package org.apache.pinot.core.data.manager.realtime;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.class */
public class IngestionDelayTrackerTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
    private final ServerMetrics _serverMetrics = (ServerMetrics) Mockito.mock(ServerMetrics.class);
    private final RealtimeTableDataManager _realtimeTableDataManager = (RealtimeTableDataManager) Mockito.mock(RealtimeTableDataManager.class);

    private IngestionDelayTracker createTracker() {
        IngestionDelayTracker ingestionDelayTracker = new IngestionDelayTracker(this._serverMetrics, REALTIME_TABLE_NAME, this._realtimeTableDataManager, () -> {
            return true;
        });
        Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0L);
        return ingestionDelayTracker;
    }

    @Test
    public void testTrackerConstructors() {
        IngestionDelayTracker ingestionDelayTracker = new IngestionDelayTracker(this._serverMetrics, REALTIME_TABLE_NAME, this._realtimeTableDataManager, () -> {
            return true;
        });
        ingestionDelayTracker.setClock(Clock.systemUTC());
        Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0L);
        Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE);
        ingestionDelayTracker.shutdown();
        IngestionDelayTracker ingestionDelayTracker2 = new IngestionDelayTracker(this._serverMetrics, REALTIME_TABLE_NAME, this._realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () -> {
            return true;
        });
        Assert.assertEquals(ingestionDelayTracker2.getPartitionIngestionDelayMs(0), 0L);
        Assert.assertEquals(ingestionDelayTracker2.getPartitionIngestionTimeMs(0), Long.MIN_VALUE);
        try {
            new IngestionDelayTracker(this._serverMetrics, REALTIME_TABLE_NAME, this._realtimeTableDataManager, 0, () -> {
                return true;
            });
            Assert.fail("Must have asserted due to invalid arguments");
        } catch (Exception e) {
            if ((e instanceof NullPointerException) || !(e instanceof RuntimeException)) {
                Assert.fail(String.format("Unexpected exception: %s:%s", e.getClass(), e.getMessage()));
            }
        }
    }

    @Test
    public void testRecordIngestionDelayWithNoAging() {
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123L).getSegmentName();
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, 234L).getSegmentName();
        IngestionDelayTracker createTracker = createTracker();
        Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
        createTracker.setClock(fixed);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                break;
            }
            long j3 = j2 + 1;
            createTracker.updateIngestionMetrics(segmentName, 0, j2, j3, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), fixed.millis() - j2);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), fixed.millis() - j3);
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), j2);
            j = j2 + 1;
        }
        long j4 = 100;
        while (true) {
            long j5 = j4;
            if (j5 < 0) {
                break;
            }
            createTracker.updateIngestionMetrics(segmentName, 0, j5, j5 + 1, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), fixed.millis() - j5);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), fixed.millis() - (j5 + 1));
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), j5);
            j4 = j5 - 1;
        }
        createTracker.updateIngestionMetrics(segmentName, 0, 100L, 100L, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        long j6 = 0;
        while (true) {
            long j7 = j6;
            if (j7 > 200) {
                break;
            }
            long j8 = j7 + 1;
            createTracker.updateIngestionMetrics(segmentName2, 1, j7, j8, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(1), fixed.millis() - j7);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(1), fixed.millis() - j8);
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(1), j7);
            j6 = j7 + 1;
        }
        long j9 = 200;
        while (true) {
            long j10 = j9;
            if (j10 < 0) {
                createTracker.shutdown();
                Assert.assertTrue(true);
                return;
            }
            long j11 = j10 + 1;
            createTracker.updateIngestionMetrics(segmentName2, 1, j10, j11, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(1), fixed.millis() - j10);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(1), fixed.millis() - j11);
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(1), j10);
            j9 = j10 - 1;
        }
    }

    @Test
    public void testRecordIngestionDelayWithAging() {
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123L).getSegmentName();
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, 234L).getSegmentName();
        IngestionDelayTracker createTracker = createTracker();
        Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
        createTracker.setClock(fixed);
        long millis = fixed.millis() - 1000;
        createTracker.updateIngestionMetrics(segmentName, 0, millis, millis, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 1000L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 1000L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), millis);
        Clock offset = Clock.offset(fixed, Duration.ofMillis(300L));
        createTracker.setClock(offset);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 1300L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 1300L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), millis);
        long millis2 = offset.millis() - 10;
        createTracker.updateIngestionMetrics(segmentName, 0, millis2, millis2, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 10L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 10L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), millis2);
        Clock offset2 = Clock.offset(offset, Duration.ofMillis(1000L));
        createTracker.setClock(offset2);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 1010L);
        long millis3 = offset2.millis() - 11;
        createTracker.updateIngestionMetrics(segmentName2, 1, millis3, millis3, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(1), 11L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(1), 11L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(1), millis3);
        createTracker.setClock(Clock.offset(offset2, Duration.ofMillis(150L)));
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(1), 161L);
        createTracker.shutdown();
    }

    @Test
    public void testStopTrackingIngestionDelay() {
        IngestionDelayTracker createTracker = createTracker();
        Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
        createTracker.setClock(fixed);
        for (int i = 0; i <= 100; i++) {
            String segmentName = new LLCSegmentName(RAW_TABLE_NAME, i, 0, 123L).getSegmentName();
            long millis = fixed.millis() - i;
            createTracker.updateIngestionMetrics(segmentName, i, millis, millis, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(i), i);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(i), i);
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(i), millis);
        }
        for (int i2 = TIMER_THREAD_TICK_INTERVAL_MS; i2 >= 0; i2--) {
            createTracker.stopTrackingPartitionIngestionDelay(i2);
        }
        for (int i3 = 0; i3 <= 100; i3++) {
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(i3), 0L);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(i3), 0L);
            Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(i3), Long.MIN_VALUE);
        }
    }

    @Test
    public void testStopTrackingIngestionDelayWithSegment() {
        IngestionDelayTracker createTracker = createTracker();
        Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
        createTracker.setClock(fixed);
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123L).getSegmentName();
        long millis = fixed.millis() - 10;
        createTracker.updateIngestionMetrics(segmentName, 0, millis, millis, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 10L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 10L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), millis);
        createTracker.stopTrackingPartitionIngestionDelay(segmentName);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 0L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 0L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE);
        createTracker.updateIngestionMetrics(segmentName, 0, millis, millis, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
        Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(0), 0L);
        Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(0), 0L);
        Assert.assertEquals(createTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE);
    }

    @Test
    public void testShutdown() {
        IngestionDelayTracker createTracker = createTracker();
        Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
        createTracker.setClock(fixed);
        for (int i = 0; i <= 100; i++) {
            String segmentName = new LLCSegmentName(RAW_TABLE_NAME, i, 0, 123L).getSegmentName();
            long millis = fixed.millis() - i;
            createTracker.updateIngestionMetrics(segmentName, i, millis, millis, (StreamPartitionMsgOffset) null, (StreamPartitionMsgOffset) null);
            Assert.assertEquals(createTracker.getPartitionIngestionDelayMs(i), i);
            Assert.assertEquals(createTracker.getPartitionEndToEndIngestionDelayMs(i), i);
        }
        createTracker.shutdown();
        createTracker().shutdown();
    }

    @Test
    public void testRecordIngestionDelayOffset() {
        String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123L).getSegmentName();
        String segmentName2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, 234L).getSegmentName();
        IngestionDelayTracker createTracker = createTracker();
        createTracker.updateIngestionMetrics(segmentName, 0, Long.MIN_VALUE, Long.MIN_VALUE, new LongMsgOffset(50L), new LongMsgOffset(150L));
        Assert.assertEquals(createTracker.getPartitionIngestionOffsetLag(0), 100L);
        Assert.assertEquals(createTracker.getPartitionIngestionUpstreamOffset(0), 150L);
        Assert.assertEquals(createTracker.getPartitionIngestionConsumingOffset(0), 50L);
        createTracker.updateIngestionMetrics(segmentName2, 1, Long.MIN_VALUE, Long.MIN_VALUE, new LongMsgOffset(50L), new LongMsgOffset(150L));
        Assert.assertEquals(createTracker.getPartitionIngestionOffsetLag(1), 100L);
        Assert.assertEquals(createTracker.getPartitionIngestionUpstreamOffset(1), 150L);
        Assert.assertEquals(createTracker.getPartitionIngestionConsumingOffset(1), 50L);
        createTracker.updateIngestionMetrics(segmentName, 0, Long.MIN_VALUE, Long.MIN_VALUE, new LongMsgOffset(150L), new LongMsgOffset(200L));
        Assert.assertEquals(createTracker.getPartitionIngestionOffsetLag(0), 50L);
        Assert.assertEquals(createTracker.getPartitionIngestionUpstreamOffset(0), 200L);
        Assert.assertEquals(createTracker.getPartitionIngestionConsumingOffset(0), 150L);
        createTracker.shutdown();
    }
}
