package org.apache.pinot.controller.helix.core.realtime.segment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.class */
public class FlushThresholdUpdaterTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final long[] EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB = {50, 60, 70, 83, 98, 120, 160, 200, 250, 310, 400, 500, 600, 700, 800, 950, 1130, 1400, 1700, 2000};
    private static final long[] LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB = {70, 180, 290, 400, 500, 605, 690, 770, 820, 865, 895, 920, 940, 955, 970, 980, 1000, 1012, 1020, 1030};
    private static final long[] STEPS_SEGMENT_SIZES_MB = {100, 100, 200, 200, 300, 300, 400, 400, 500, 500, 600, 600, 700, 700, 800, 800, 900, 900, 1000, 1000};

    @Test
    public void testFlushThresholdUpdateManager() {
        FlushThresholdUpdateManager flushThresholdUpdateManager = new FlushThresholdUpdateManager();
        DefaultFlushThresholdUpdater flushThresholdUpdater = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(5000000));
        Assert.assertTrue(flushThresholdUpdater instanceof DefaultFlushThresholdUpdater);
        Assert.assertEquals(flushThresholdUpdater.getTableFlushSize(), 5000000);
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        FlushThresholdUpdater flushThresholdUpdater2 = flushThresholdUpdateManager.getFlushThresholdUpdater(mockDefaultAutotuneStreamConfig);
        Assert.assertTrue(flushThresholdUpdater2 instanceof SegmentSizeBasedFlushThresholdUpdater);
        Assert.assertSame(flushThresholdUpdateManager.getFlushThresholdUpdater(mockAutotuneStreamConfig(10000L, 10000L, 10000)), flushThresholdUpdater2);
        DefaultFlushThresholdUpdater flushThresholdUpdater3 = flushThresholdUpdateManager.getFlushThresholdUpdater(mockStreamConfig(10000));
        Assert.assertTrue(flushThresholdUpdater3 instanceof DefaultFlushThresholdUpdater);
        Assert.assertEquals(flushThresholdUpdater3.getTableFlushSize(), 10000);
        Assert.assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(mockDefaultAutotuneStreamConfig), flushThresholdUpdater2);
        flushThresholdUpdateManager.clearFlushThresholdUpdater(REALTIME_TABLE_NAME);
        Assert.assertNotSame(flushThresholdUpdateManager.getFlushThresholdUpdater(mockDefaultAutotuneStreamConfig), flushThresholdUpdater2);
    }

    private PartitionLevelStreamConfig mockStreamConfig(int i) {
        PartitionLevelStreamConfig partitionLevelStreamConfig = (PartitionLevelStreamConfig) Mockito.mock(PartitionLevelStreamConfig.class);
        Mockito.when(partitionLevelStreamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
        Mockito.when(Integer.valueOf(partitionLevelStreamConfig.getFlushThresholdRows())).thenReturn(Integer.valueOf(i));
        return partitionLevelStreamConfig;
    }

    private PartitionLevelStreamConfig mockAutotuneStreamConfig(long j, long j2, int i) {
        PartitionLevelStreamConfig partitionLevelStreamConfig = (PartitionLevelStreamConfig) Mockito.mock(PartitionLevelStreamConfig.class);
        Mockito.when(partitionLevelStreamConfig.getTableNameWithType()).thenReturn(REALTIME_TABLE_NAME);
        Mockito.when(Integer.valueOf(partitionLevelStreamConfig.getFlushThresholdRows())).thenReturn(0);
        Mockito.when(Long.valueOf(partitionLevelStreamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(Long.valueOf(j));
        Mockito.when(Long.valueOf(partitionLevelStreamConfig.getFlushThresholdTimeMillis())).thenReturn(Long.valueOf(j2));
        Mockito.when(Integer.valueOf(partitionLevelStreamConfig.getFlushAutotuneInitialRows())).thenReturn(Integer.valueOf(i));
        return partitionLevelStreamConfig;
    }

    private PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig() {
        return mockAutotuneStreamConfig(209715200L, StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS, 100000);
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [long[], java.lang.Object[]] */
    @Test
    public void testSegmentSizeBasedFlushThreshold() {
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        long flushThresholdSegmentSizeBytes = mockDefaultAutotuneStreamConfig.getFlushThresholdSegmentSizeBytes();
        long j = (long) (flushThresholdSegmentSizeBytes * 0.99d);
        long j2 = (long) (flushThresholdSegmentSizeBytes * 1.01d);
        for (long[] jArr : Arrays.asList(new long[]{EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB})) {
            SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
            SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
            segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(0L), (SegmentZKMetadata) null, 1, Collections.emptyList());
            Assert.assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), mockDefaultAutotuneStreamConfig.getFlushAutotuneInitialRows());
            for (int i = 0; i < 500; i++) {
                int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
                long segmentSizeBytes = getSegmentSizeBytes(sizeThresholdToFlushSegment, jArr);
                segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(segmentSizeBytes), getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment, sizeThresholdToFlushSegment), 1, Collections.emptyList());
                if (i > 400) {
                    Assert.assertTrue(segmentSizeBytes > j && segmentSizeBytes < j2);
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [long[], java.lang.Object[]] */
    @Test
    public void testSegmentSizeBasedFlushThresholdMinPartition() {
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        long flushThresholdSegmentSizeBytes = mockDefaultAutotuneStreamConfig.getFlushThresholdSegmentSizeBytes();
        long j = (long) (flushThresholdSegmentSizeBytes * 0.99d);
        long j2 = (long) (flushThresholdSegmentSizeBytes * 1.01d);
        for (long[] jArr : Arrays.asList(new long[]{EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB})) {
            SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
            SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
            segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(0L), (SegmentZKMetadata) null, 1, getPartitionGroupMetadataList(3, 1));
            Assert.assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), mockDefaultAutotuneStreamConfig.getFlushAutotuneInitialRows());
            for (int i = 0; i < 500; i++) {
                int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
                long segmentSizeBytes = getSegmentSizeBytes(sizeThresholdToFlushSegment, jArr);
                segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(segmentSizeBytes), getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment, sizeThresholdToFlushSegment), 1, getPartitionGroupMetadataList(3, 1));
                if (i > 400) {
                    Assert.assertTrue(segmentSizeBytes > j && segmentSizeBytes < j2);
                }
            }
        }
    }

    private SegmentZKMetadata getNewSegmentZKMetadata(int i) {
        return new SegmentZKMetadata(new LLCSegmentName(RAW_TABLE_NAME, i, 0, System.currentTimeMillis()).getSegmentName());
    }

    private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(new PartitionGroupMetadata(i2 + i3, (StreamPartitionMsgOffset) null));
        }
        return arrayList;
    }

    private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long j) {
        return new CommittingSegmentDescriptor((String) null, new LongMsgOffset(0L).toString(), j);
    }

    private SegmentZKMetadata getCommittingSegmentZKMetadata(long j, int i, int i2) {
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata("ignored");
        segmentZKMetadata.setCreationTime(j);
        segmentZKMetadata.setSizeThresholdToFlushSegment(i);
        segmentZKMetadata.setTotalDocs(i2);
        return segmentZKMetadata;
    }

    private long getSegmentSizeBytes(int i, long[] jArr) {
        double d;
        if (i < 100000) {
            d = (jArr[0] / 100000.0d) * i;
        } else {
            int min = Integer.min(i / 100000, 19);
            d = jArr[min] + (((jArr[min] - jArr[min - 1]) / 100000.0d) * (i - (min * 100000)));
        }
        return (long) (d * 1024.0d * 1024.0d);
    }

    @Test
    public void testTimeThreshold() {
        SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(0L), (SegmentZKMetadata) null, 1, Collections.emptyList());
        int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(128000L);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment, 15000), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment2 = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertEquals(sizeThresholdToFlushSegment2, (int) (15000 * 1.1d));
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment2, sizeThresholdToFlushSegment2), 1, Collections.emptyList());
        Assert.assertNotEquals(Integer.valueOf(newSegmentZKMetadata.getSizeThresholdToFlushSegment()), Integer.valueOf((int) (sizeThresholdToFlushSegment2 * 1.1d)));
    }

    @Test
    public void testMinThreshold() {
        SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(0L), (SegmentZKMetadata) null, 1, Collections.emptyList());
        int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(128L);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment, 15), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment2 = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertEquals(sizeThresholdToFlushSegment2, 10000);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment2, 20), 1, Collections.emptyList());
        Assert.assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), 10000);
    }

    @Test
    public void testNonZeroPartitionUpdates() {
        SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
        PartitionLevelStreamConfig mockDefaultAutotuneStreamConfig = mockDefaultAutotuneStreamConfig();
        SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
        SegmentZKMetadata newSegmentZKMetadata2 = getNewSegmentZKMetadata(1);
        CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, (SegmentZKMetadata) null, 1, Collections.emptyList());
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata2, committingSegmentDescriptor, (SegmentZKMetadata) null, 1, Collections.emptyList());
        int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        int sizeThresholdToFlushSegment2 = newSegmentZKMetadata2.getSizeThresholdToFlushSegment();
        double latestSegmentRowsToSizeRatio = segmentSizeBasedFlushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
        Assert.assertEquals(sizeThresholdToFlushSegment, mockDefaultAutotuneStreamConfig.getFlushAutotuneInitialRows());
        Assert.assertEquals(sizeThresholdToFlushSegment2, mockDefaultAutotuneStreamConfig.getFlushAutotuneInitialRows());
        Assert.assertEquals(Double.valueOf(latestSegmentRowsToSizeRatio), Double.valueOf(0.0d));
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata2, getCommittingSegmentDescriptor(128000000L), getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment2, sizeThresholdToFlushSegment2), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment3 = newSegmentZKMetadata2.getSizeThresholdToFlushSegment();
        double latestSegmentRowsToSizeRatio2 = segmentSizeBasedFlushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
        Assert.assertTrue(latestSegmentRowsToSizeRatio2 > 0.0d);
        CommittingSegmentDescriptor committingSegmentDescriptor2 = getCommittingSegmentDescriptor(256000000L);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata2, committingSegmentDescriptor2, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment3, sizeThresholdToFlushSegment3), 1, Collections.emptyList());
        Assert.assertEquals(Double.valueOf(segmentSizeBasedFlushThresholdUpdater.getLatestSegmentRowsToSizeRatio()), Double.valueOf(latestSegmentRowsToSizeRatio2));
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockDefaultAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor2, getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdToFlushSegment, sizeThresholdToFlushSegment), 1, Collections.emptyList());
        Assert.assertNotEquals(Double.valueOf(segmentSizeBasedFlushThresholdUpdater.getLatestSegmentRowsToSizeRatio()), Double.valueOf(latestSegmentRowsToSizeRatio2));
    }

    @Test
    public void testSegmentSizeBasedUpdaterWithModifications() {
        SegmentSizeBasedFlushThresholdUpdater segmentSizeBasedFlushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
        long j = StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS / 2;
        PartitionLevelStreamConfig mockAutotuneStreamConfig = mockAutotuneStreamConfig(104857600L, j, 50000);
        SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockAutotuneStreamConfig, newSegmentZKMetadata, getCommittingSegmentDescriptor(0L), (SegmentZKMetadata) null, 1, Collections.emptyList());
        int sizeThresholdToFlushSegment = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertEquals(sizeThresholdToFlushSegment, 50000);
        long j2 = (104857600 * 9) / 10;
        long currentTimeMillis = System.currentTimeMillis() - ((j * 9) / 10);
        CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(j2);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockAutotuneStreamConfig, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(currentTimeMillis, sizeThresholdToFlushSegment, sizeThresholdToFlushSegment), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment2 = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertTrue(sizeThresholdToFlushSegment2 > sizeThresholdToFlushSegment);
        long j3 = (j2 * 9) / 10;
        PartitionLevelStreamConfig mockAutotuneStreamConfig2 = mockAutotuneStreamConfig(j3, j, 50000);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockAutotuneStreamConfig2, newSegmentZKMetadata, committingSegmentDescriptor, getCommittingSegmentZKMetadata(currentTimeMillis, sizeThresholdToFlushSegment2, sizeThresholdToFlushSegment2), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment3 = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertTrue(sizeThresholdToFlushSegment3 < sizeThresholdToFlushSegment2);
        int i = (sizeThresholdToFlushSegment3 * 9) / 10;
        CommittingSegmentDescriptor committingSegmentDescriptor2 = getCommittingSegmentDescriptor((j3 * 9) / 10);
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockAutotuneStreamConfig2, newSegmentZKMetadata, committingSegmentDescriptor2, getCommittingSegmentZKMetadata(currentTimeMillis, sizeThresholdToFlushSegment3, i), 1, Collections.emptyList());
        int sizeThresholdToFlushSegment4 = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
        Assert.assertEquals(sizeThresholdToFlushSegment4, (long) (i * 1.1d));
        int i2 = (sizeThresholdToFlushSegment4 * 9) / 10;
        segmentSizeBasedFlushThresholdUpdater.updateFlushThreshold(mockAutotuneStreamConfig(j3, j / 2, 50000), newSegmentZKMetadata, committingSegmentDescriptor2, getCommittingSegmentZKMetadata(currentTimeMillis, sizeThresholdToFlushSegment4, i2), 1, Collections.emptyList());
        Assert.assertTrue(newSegmentZKMetadata.getSizeThresholdToFlushSegment() < i2);
    }
}
