package org.apache.pinot.controller.helix.core.realtime.segment;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.spi.stream.StreamConfig;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.class */
public class SegmentFlushThresholdComputerTest {
    @Test
    public void testUseAutoTuneInitialRowsIfFirstSegmentInPartition() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Integer.valueOf(streamConfig.getFlushAutotuneInitialRows())).thenReturn(1000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class), (SegmentZKMetadata) null, "newSegmentName"), 1000);
    }

    @Test
    public void testUseLastSegmentSizeTimesRatioIfFirstSegmentInPartitionAndNewPartitionGroup() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer(Clock.systemUTC(), 1.5d);
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(20000L);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class), (SegmentZKMetadata) null, "newSegmentName"), 30000);
    }

    @Test
    public void testUseLastSegmentSizeTimesRatioIfFirstSegmentInPartitionAndNewPartitionGroupMinimumSize10000Rows() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer(Clock.systemUTC(), 1.5d);
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(2000L);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class), (SegmentZKMetadata) null, "newSegmentName"), 10000);
    }

    @Test
    public void testUseLastSegmentsThresholdIfSegmentSizeMissing() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdTimeMillis())).thenReturn(123L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(0L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(5000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "newSegmentName"), 5000);
    }

    @Test
    public void testUseLastSegmentsThresholdIfSegmentIsCommittingDueToForceCommit() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(500000L);
        Mockito.when(committingSegmentDescriptor.getStopReason()).thenReturn("forceCommitMessageReceived");
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(25000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold((StreamConfig) Mockito.mock(StreamConfig.class), committingSegmentDescriptor, segmentZKMetadata, "newSegmentName"), 25000);
    }

    @Test
    public void testApplyMultiplierToTotalDocsWhenTimeThresholdNotReached() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer(Clock.fixed(Instant.ofEpochMilli(1640216032391L), ZoneId.of("UTC")));
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdTimeMillis())).thenReturn(Long.valueOf(TimeUnit.MILLISECONDS.convert(6L, TimeUnit.HOURS)));
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(2000000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(Long.valueOf(ControllerTest.TIMEOUT_MS));
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(20000);
        Mockito.when(Long.valueOf(segmentZKMetadata.getCreationTime())).thenReturn(Long.valueOf(1640216032391L - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS)));
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 11000);
    }

    @Test
    public void testApplyMultiplierToAdjustedTotalDocsWhenTimeThresholdIsReached() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer(Clock.fixed(Instant.ofEpochMilli(1640216032391L), ZoneId.of("UTC")));
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdTimeMillis())).thenReturn(Long.valueOf(TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS)));
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(2000000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(30000L);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(60000);
        Mockito.when(Long.valueOf(segmentZKMetadata.getCreationTime())).thenReturn(Long.valueOf(1640216032391L - TimeUnit.MILLISECONDS.convert(2L, TimeUnit.HOURS)));
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 16500);
    }

    @Test
    public void testSegmentSizeTooSmall() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(5000000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(30000L);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(20000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 15000);
    }

    @Test
    public void testSegmentSizeTooBig() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(5000000L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(2000000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(30000L);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(20000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 45000);
    }

    @Test
    public void testSegmentSizeJustRight() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(2500000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(30000L);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(20000);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 36000);
    }

    @Test
    public void testNoRows() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(2500000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(0L);
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(0);
        Assert.assertEquals(segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z"), 10000);
    }

    @Test
    public void testAdjustRowsToSizeRatio() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
        Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(3000000L);
        CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
        Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(200000L);
        SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
        Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(30000L, new Long[]{50000L});
        Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(60000);
        segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z");
        Assert.assertEquals(segmentFlushThresholdComputer.getLatestSegmentRowsToSizeRatio(), 0.15d);
        segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z");
        Assert.assertEquals(segmentFlushThresholdComputer.getLatestSegmentRowsToSizeRatio(), 0.16d);
    }

    @Test(invocationCount = 1000)
    public void testSegmentFlushThresholdVariance() {
        SegmentFlushThresholdComputer segmentFlushThresholdComputer = new SegmentFlushThresholdComputer();
        double d = 0.0d;
        while (true) {
            double d2 = d;
            if (d2 > 0.5d) {
                return;
            }
            StreamConfig streamConfig = (StreamConfig) Mockito.mock(StreamConfig.class);
            Mockito.when(Long.valueOf(streamConfig.getFlushThresholdSegmentSizeBytes())).thenReturn(2000000L);
            Mockito.when(streamConfig.getStreamConfigsMap()).thenReturn(Map.of("realtime.segment.flush.threshold.variance.percentage", String.valueOf(d2)));
            CommittingSegmentDescriptor committingSegmentDescriptor = (CommittingSegmentDescriptor) Mockito.mock(CommittingSegmentDescriptor.class);
            Mockito.when(Long.valueOf(committingSegmentDescriptor.getSegmentSizeBytes())).thenReturn(300000L);
            SegmentZKMetadata segmentZKMetadata = (SegmentZKMetadata) Mockito.mock(SegmentZKMetadata.class);
            Mockito.when(Long.valueOf(segmentZKMetadata.getTotalDocs())).thenReturn(60000L, new Long[]{50000L});
            Mockito.when(Integer.valueOf(segmentZKMetadata.getSizeThresholdToFlushSegment())).thenReturn(60000);
            int computeThreshold = segmentFlushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, segmentZKMetadata, "events3__0__0__20211222T1646Z");
            Assert.assertTrue(((double) computeThreshold) >= (1.0d - d2) * ((double) 90000) && ((double) computeThreshold) <= (1.0d + d2) * ((double) 90000));
            d = d2 + 0.05d;
        }
    }
}
