package org.apache.pinot.core.data.manager.realtime;

import com.google.common.cache.LoadingCache;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.Fixtures;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.apache.zookeeper.data.Stat;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.class */
public class LLRealtimeSegmentDataManagerTest {
    private static final String SEGMENT_DIR = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName();
    private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
    private static final String TABLE_NAME = "Coffee";
    private static final int PARTITION_GROUP_ID = 0;
    private static final int SEQUENCE_ID = 945;
    private static final long SEG_TIME_MS = 98347869999L;
    private static final LLCSegmentName SEGMENT_NAME = new LLCSegmentName(TABLE_NAME, PARTITION_GROUP_ID, SEQUENCE_ID, SEG_TIME_MS);
    private static final String SEGMENT_NAME_STR = SEGMENT_NAME.getSegmentName();
    private static final long START_OFFSET_VALUE = 198;
    private static final LongMsgOffset START_OFFSET = new LongMsgOffset(START_OFFSET_VALUE);
    private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest$FakeLLRealtimeSegmentDataManager.class */
    public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
        public Field _state;
        public Field _shouldStop;
        public Field _stopReason;
        private Field _streamMsgOffsetFactory;
        public LinkedList<LongMsgOffset> _consumeOffsets;
        public LinkedList<SegmentCompletionProtocol.Response> _responses;
        public boolean _commitSegmentCalled;
        public boolean _buildSegmentCalled;
        public boolean _failSegmentBuild;
        public boolean _buildAndReplaceCalled;
        public int _stopWaitTimeMs;
        private boolean _downloadAndReplaceCalled;
        public boolean _throwExceptionFromConsume;
        public boolean _postConsumeStoppedCalled;
        public Map<Integer, Semaphore> _semaphoreMap;
        public boolean _stubConsumeLoop;
        private TimeSupplier _timeSupplier;

        private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
            InstanceDataManagerConfig instanceDataManagerConfig = (InstanceDataManagerConfig) Mockito.mock(InstanceDataManagerConfig.class);
            Mockito.when(instanceDataManagerConfig.getReadMode()).thenReturn((Object) null);
            Mockito.when(instanceDataManagerConfig.getAvgMultiValueCount()).thenReturn((Object) null);
            Mockito.when(instanceDataManagerConfig.getSegmentFormatVersion()).thenReturn((Object) null);
            Mockito.when(Boolean.valueOf(instanceDataManagerConfig.isEnableSplitCommit())).thenReturn(false);
            Mockito.when(Boolean.valueOf(instanceDataManagerConfig.isRealtimeOffHeapAllocation())).thenReturn(false);
            Mockito.when(instanceDataManagerConfig.getConfig()).thenReturn(new PinotConfiguration());
            return instanceDataManagerConfig;
        }

        public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String str, Schema schema, LLCSegmentName lLCSegmentName, Map<Integer, Semaphore> map, ServerMetrics serverMetrics, TimeSupplier timeSupplier) throws Exception {
            super(segmentZKMetadata, tableConfig, realtimeTableDataManager, str, new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, lLCSegmentName, map.get(Integer.valueOf(lLCSegmentName.getPartitionGroupId())), serverMetrics, (PartitionUpsertMetadataManager) null, (PartitionDedupMetadataManager) null);
            this._consumeOffsets = new LinkedList<>();
            this._responses = new LinkedList<>();
            this._commitSegmentCalled = false;
            this._buildSegmentCalled = false;
            this._failSegmentBuild = false;
            this._buildAndReplaceCalled = false;
            this._stopWaitTimeMs = 100;
            this._downloadAndReplaceCalled = false;
            this._throwExceptionFromConsume = false;
            this._postConsumeStoppedCalled = false;
            this._stubConsumeLoop = true;
            this._state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
            this._state.setAccessible(true);
            this._shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
            this._shouldStop.setAccessible(true);
            this._stopReason = LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason");
            this._stopReason.setAccessible(true);
            this._semaphoreMap = map;
            this._streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
            this._streamMsgOffsetFactory.setAccessible(true);
            this._streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
            this._timeSupplier = timeSupplier;
        }

        public String getStopReason() {
            try {
                return (String) this._stopReason.get(this);
            } catch (Exception e) {
                Assert.fail();
                return null;
            }
        }

        public LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer() {
            return new LLRealtimeSegmentDataManager.PartitionConsumer(this);
        }

        public LLRealtimeSegmentDataManager.SegmentBuildDescriptor invokeBuildForCommit(long j) {
            super.buildSegmentForCommit(j);
            return getSegmentBuildDescriptor();
        }

        public boolean invokeCommit() {
            SegmentCompletionProtocol.Response response = (SegmentCompletionProtocol.Response) Mockito.mock(SegmentCompletionProtocol.Response.class);
            Mockito.when(Boolean.valueOf(response.isSplitCommit())).thenReturn(false);
            return super.commitSegment(response.getControllerVipUrl(), false);
        }

        private void terminateLoopIfNecessary() {
            if (this._consumeOffsets.isEmpty() && this._responses.isEmpty()) {
                try {
                    this._shouldStop.set(this, true);
                } catch (Exception e) {
                    Assert.fail();
                }
            }
        }

        protected void startConsumerThread() {
        }

        protected boolean consumeLoop() throws Exception {
            if (!this._stubConsumeLoop) {
                return super.consumeLoop();
            }
            if (this._throwExceptionFromConsume) {
                throw new PermanentConsumerException(new Throwable("Offset out of range"));
            }
            setCurrentOffset(this._consumeOffsets.remove().getOffset());
            terminateLoopIfNecessary();
            return true;
        }

        protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
            SegmentCompletionProtocol.Response remove = this._responses.remove();
            terminateLoopIfNecessary();
            return remove;
        }

        protected SegmentCompletionProtocol.Response commit(String str, boolean z) {
            return this._responses.remove();
        }

        protected void postStopConsumedMsg(String str) {
            this._postConsumeStoppedCalled = true;
        }

        protected long now() {
            return this._timeSupplier == null ? System.currentTimeMillis() : this._timeSupplier.get().longValue();
        }

        protected void hold() {
            this._timeSupplier.add(5000L);
        }

        protected boolean buildSegmentAndReplace() {
            this._buildAndReplaceCalled = true;
            return true;
        }

        protected LLRealtimeSegmentDataManager.SegmentBuildDescriptor buildSegmentInternal(boolean z) {
            this._buildSegmentCalled = true;
            if (this._failSegmentBuild) {
                return null;
            }
            if (!z) {
                return new LLRealtimeSegmentDataManager.SegmentBuildDescriptor(this, (File) null, (Map) null, getCurrentOffset(), 0L, 0L, -1L);
            }
            File file = new File(LLRealtimeSegmentDataManagerTest.SEGMENT_DIR, "segmentFile");
            try {
                file.createNewFile();
            } catch (IOException e) {
                Assert.fail("Could not create file " + file);
            }
            return new LLRealtimeSegmentDataManager.SegmentBuildDescriptor(this, file, (Map) null, getCurrentOffset(), 0L, 0L, -1L);
        }

        protected boolean commitSegment(String str, boolean z) {
            this._commitSegmentCalled = true;
            return true;
        }

        protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) {
            this._downloadAndReplaceCalled = true;
        }

        public void stop() {
            this._timeSupplier.add(this._stopWaitTimeMs);
        }

        public void setCurrentOffset(long j) {
            setOffset(j, "_currentOffset");
        }

        public void setConsumeEndTime(long j) {
            setLong(j, "_consumeEndTime");
        }

        public void setNumRowsConsumed(int i) {
            setInt(i, "_numRowsConsumed");
        }

        public void setNumRowsIndexed(int i) {
            setInt(i, "_numRowsIndexed");
        }

        public void setFinalOffset(long j) {
            setOffset(j, "_finalOffset");
        }

        public boolean invokeEndCriteriaReached() {
            try {
                Method declaredMethod = LLRealtimeSegmentDataManager.class.getDeclaredMethod("endCriteriaReached", new Class[LLRealtimeSegmentDataManagerTest.PARTITION_GROUP_ID]);
                declaredMethod.setAccessible(true);
                return ((Boolean) declaredMethod.invoke(this, new Object[LLRealtimeSegmentDataManagerTest.PARTITION_GROUP_ID])).booleanValue();
            } catch (IllegalAccessException e) {
                Assert.fail();
                throw new RuntimeException("Cannot get here");
            } catch (NoSuchMethodException e2) {
                Assert.fail();
                throw new RuntimeException("Cannot get here");
            } catch (InvocationTargetException e3) {
                Assert.fail();
                throw new RuntimeException("Cannot get here");
            }
        }

        public void setSegmentMaxRowCount(int i) {
            setInt(i, "_segmentMaxRowCount");
        }

        private void setLong(long j, String str) {
            try {
                Field declaredField = LLRealtimeSegmentDataManager.class.getDeclaredField(str);
                declaredField.setAccessible(true);
                declaredField.setLong(this, j);
            } catch (IllegalAccessException e) {
                Assert.fail();
            } catch (NoSuchFieldException e2) {
                Assert.fail();
            }
        }

        private void setOffset(long j, String str) {
            try {
                Field declaredField = LLRealtimeSegmentDataManager.class.getDeclaredField(str);
                declaredField.setAccessible(true);
                declaredField.set(this, new LongMsgOffset(j));
            } catch (IllegalAccessException e) {
                Assert.fail();
            } catch (NoSuchFieldException e2) {
                Assert.fail();
            }
        }

        private void setInt(int i, String str) {
            try {
                Field declaredField = LLRealtimeSegmentDataManager.class.getDeclaredField(str);
                declaredField.setAccessible(true);
                declaredField.setInt(this, i);
            } catch (IllegalAccessException e) {
                Assert.fail();
            } catch (NoSuchFieldException e2) {
                Assert.fail();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest$TimeSupplier.class */
    public static class TimeSupplier implements Supplier<Long> {
        protected final AtomicInteger _timeCheckCounter = new AtomicInteger();
        protected long _timeNow = System.currentTimeMillis();

        private TimeSupplier() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public Long get() {
            this._timeCheckCounter.incrementAndGet();
            return Long.valueOf(this._timeNow);
        }

        public void set(long j) {
            this._timeNow = j;
        }

        public void add(long j) {
            this._timeNow += j;
        }
    }

    private static TableConfig createTableConfig() throws Exception {
        return Fixtures.createTableConfig(FakeStreamConsumerFactory.class.getName(), FakeStreamMessageDecoder.class.getName());
    }

    private RealtimeTableDataManager createTableDataManager(TableConfig tableConfig) {
        SegmentBuildTimeLeaseExtender.getOrCreate("server-1", new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), tableConfig.getTableName());
        RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) Mockito.mock(RealtimeTableDataManager.class);
        Mockito.when(realtimeTableDataManager.getServerInstance()).thenReturn("server-1");
        RealtimeSegmentStatsHistory realtimeSegmentStatsHistory = (RealtimeSegmentStatsHistory) Mockito.mock(RealtimeSegmentStatsHistory.class);
        Mockito.when(Integer.valueOf(realtimeSegmentStatsHistory.getEstimatedCardinality(Mockito.anyString()))).thenReturn(200);
        Mockito.when(Integer.valueOf(realtimeSegmentStatsHistory.getEstimatedAvgColSize(Mockito.anyString()))).thenReturn(32);
        Mockito.when(realtimeTableDataManager.getStatsHistory()).thenReturn(realtimeSegmentStatsHistory);
        return realtimeTableDataManager;
    }

    private SegmentZKMetadata createZkMetadata() {
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
        segmentZKMetadata.setStartOffset(START_OFFSET.toString());
        segmentZKMetadata.setCreationTime(System.currentTimeMillis());
        segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        return segmentZKMetadata;
    }

    private FakeLLRealtimeSegmentDataManager createFakeSegmentManager() throws Exception {
        return createFakeSegmentManager(false, new TimeSupplier(), null, null, null);
    }

    private FakeLLRealtimeSegmentDataManager createFakeSegmentManager(boolean z, TimeSupplier timeSupplier, @Nullable String str, @Nullable String str2, @Nullable TableConfig tableConfig) throws Exception {
        SegmentZKMetadata createZkMetadata = createZkMetadata();
        if (tableConfig == null) {
            tableConfig = createTableConfig();
        }
        if (z) {
            tableConfig.setUpsertConfig((UpsertConfig) null);
        }
        if (str != null) {
            tableConfig.getIndexingConfig().getStreamConfigs().put("realtime.segment.flush.threshold.rows", str);
        }
        if (str2 != null) {
            tableConfig.getIndexingConfig().getStreamConfigs().put("realtime.segment.flush.threshold.time", str2);
        }
        RealtimeTableDataManager createTableDataManager = createTableDataManager(tableConfig);
        LLCSegmentName lLCSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
        this._partitionGroupIdToSemaphoreMap.putIfAbsent(Integer.valueOf(PARTITION_GROUP_ID), new Semaphore(1));
        return new FakeLLRealtimeSegmentDataManager(createZkMetadata, tableConfig, createTableDataManager, SEGMENT_DIR, Fixtures.createSchema(), lLCSegmentName, this._partitionGroupIdToSemaphoreMap, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), timeSupplier);
    }

    @BeforeClass
    public void setUp() {
        SEGMENT_DIR_FILE.deleteOnExit();
        SegmentBuildTimeLeaseExtender.initExecutor();
    }

    @AfterClass
    public void tearDown() {
        FileUtils.deleteQuietly(SEGMENT_DIR_FILE);
        SegmentBuildTimeLeaseExtender.shutdownExecutor();
    }

    @Test
    public void testOffsetParsing() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        Assert.assertEquals(createFakeSegmentManager.extractOffset(SegmentCompletionProtocol.Response.fromJsonString("{  \"streamPartitionMsgOffset\" : \"34\",  \"offset\" : 34,  \"buildTimeSec\" : -1,  \"isSplitCommitType\" : false,  \"segmentLocation\" : \"file:///a/b\",  \"status\" : \"CATCH_UP\"}")).compareTo(new LongMsgOffset("34")), PARTITION_GROUP_ID);
        Assert.assertEquals(createFakeSegmentManager.extractOffset(SegmentCompletionProtocol.Response.fromJsonString("{  \"offset\" : 34,  \"buildTimeSec\" : -1,  \"isSplitCommitType\" : false,  \"segmentLocation\" : \"file:///a/b\",  \"status\" : \"CATCH_UP\"}")).compareTo(new LongMsgOffset("34")), PARTITION_GROUP_ID);
        Assert.assertEquals(createFakeSegmentManager.extractOffset(SegmentCompletionProtocol.Response.fromJsonString("{  \"streamPartitionMsgOffset\" : \"34\",  \"buildTimeSec\" : -1,  \"isSplitCommitType\" : false,  \"segmentLocation\" : \"file:///a/b\",  \"status\" : \"CATCH_UP\"}")).compareTo(new LongMsgOffset("34")), PARTITION_GROUP_ID);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testHolding() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(longMsgOffset.toString()));
        for (int i = PARTITION_GROUP_ID; i < 100; i++) {
            createFakeSegmentManager._responses.add(response);
        }
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.HOLDING);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testCommitAfterHold() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        SegmentCompletionProtocol.Response response2 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
        createFakeSegmentManager._responses.add(response);
        createFakeSegmentManager._responses.add(response2);
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.COMMITTED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testSegmentBuildException() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)));
        createFakeSegmentManager._failSegmentBuild = true;
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.ERROR);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testCommitAfterCatchup() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        LongMsgOffset longMsgOffset2 = new LongMsgOffset(longMsgOffset.getOffset() + 10);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset2);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(longMsgOffset.toString()));
        SegmentCompletionProtocol.Response response2 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP).withStreamPartitionMsgOffset(longMsgOffset2.toString()));
        SegmentCompletionProtocol.Response response3 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        SegmentCompletionProtocol.Response response4 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
        createFakeSegmentManager._responses.add(response);
        createFakeSegmentManager._responses.add(response2);
        createFakeSegmentManager._responses.add(response3);
        createFakeSegmentManager._responses.add(response4);
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.COMMITTED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testCommitAfterCatchupWithPeriodOffset() throws Exception {
        TableConfig createTableConfig = createTableConfig();
        createTableConfig.getIndexingConfig().getStreamConfigs().put(StreamConfigProperties.constructStreamProperty("consumer.prop.auto.offset.reset", "fakeStream"), "2d");
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager(false, new TimeSupplier(), null, null, createTableConfig);
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        LongMsgOffset longMsgOffset2 = new LongMsgOffset(longMsgOffset.getOffset() + 10);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset2);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(longMsgOffset.toString()));
        SegmentCompletionProtocol.Response response2 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP).withStreamPartitionMsgOffset(longMsgOffset2.toString()));
        SegmentCompletionProtocol.Response response3 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        SegmentCompletionProtocol.Response response4 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
        createFakeSegmentManager._responses.add(response);
        createFakeSegmentManager._responses.add(response2);
        createFakeSegmentManager._responses.add(response3);
        createFakeSegmentManager._responses.add(response4);
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.COMMITTED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testCommitAfterCatchupWithTimestampOffset() throws Exception {
        TableConfig createTableConfig = createTableConfig();
        createTableConfig.getIndexingConfig().getStreamConfigs().put(StreamConfigProperties.constructStreamProperty("consumer.prop.auto.offset.reset", "fakeStream"), Instant.now().toString());
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager(false, new TimeSupplier(), null, null, createTableConfig);
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        LongMsgOffset longMsgOffset2 = new LongMsgOffset(longMsgOffset.getOffset() + 10);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset2);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withStreamPartitionMsgOffset(longMsgOffset.toString()));
        SegmentCompletionProtocol.Response response2 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP).withStreamPartitionMsgOffset(longMsgOffset2.toString()));
        SegmentCompletionProtocol.Response response3 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        SegmentCompletionProtocol.Response response4 = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset2.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT));
        createFakeSegmentManager._responses.add(response);
        createFakeSegmentManager._responses.add(response2);
        createFakeSegmentManager._responses.add(response3);
        createFakeSegmentManager._responses.add(response4);
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.COMMITTED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testDiscarded() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.DISCARD)));
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertFalse(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.DISCARDED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testRetained() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params();
        params.withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(params));
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertFalse(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.RETAINED);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testNotLeader() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStreamPartitionMsgOffset(longMsgOffset.toString()).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER));
        for (int i = PARTITION_GROUP_ID; i < 100; i++) {
            createFakeSegmentManager._responses.add(response);
        }
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._responses.isEmpty());
        Assert.assertTrue(createFakeSegmentManager._consumeOffsets.isEmpty());
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._commitSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertEquals(createFakeSegmentManager._state.get(createFakeSegmentManager), LLRealtimeSegmentDataManager.State.HOLDING);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testConsumingException() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        createFakeSegmentManager._throwExceptionFromConsume = true;
        createFakeSegmentManager._postConsumeStoppedCalled = false;
        createPartitionConsumer.run();
        Assert.assertTrue(createFakeSegmentManager._postConsumeStoppedCalled);
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testOnlineTransitionAfterStop() throws Exception {
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
        LongMsgOffset longMsgOffset = new LongMsgOffset(798L);
        segmentZKMetadata.setEndOffset(longMsgOffset.toString());
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        createFakeSegmentManager._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager._state.set(createFakeSegmentManager, LLRealtimeSegmentDataManager.State.COMMITTED);
        createFakeSegmentManager.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertFalse(createFakeSegmentManager._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager._buildAndReplaceCalled);
        createFakeSegmentManager.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager2 = createFakeSegmentManager();
        createFakeSegmentManager2._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager2._state.set(createFakeSegmentManager2, LLRealtimeSegmentDataManager.State.RETAINED);
        createFakeSegmentManager2.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertFalse(createFakeSegmentManager2._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager2._buildAndReplaceCalled);
        createFakeSegmentManager2.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager3 = createFakeSegmentManager();
        createFakeSegmentManager3._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager3._state.set(createFakeSegmentManager3, LLRealtimeSegmentDataManager.State.DISCARDED);
        createFakeSegmentManager3.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertTrue(createFakeSegmentManager3._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager3._buildAndReplaceCalled);
        createFakeSegmentManager3.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager4 = createFakeSegmentManager();
        createFakeSegmentManager4._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager4._state.set(createFakeSegmentManager4, LLRealtimeSegmentDataManager.State.ERROR);
        createFakeSegmentManager4.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertTrue(createFakeSegmentManager4._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager4._buildAndReplaceCalled);
        createFakeSegmentManager4.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager5 = createFakeSegmentManager();
        createFakeSegmentManager5._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager5._state.set(createFakeSegmentManager5, LLRealtimeSegmentDataManager.State.HOLDING);
        createFakeSegmentManager5.setCurrentOffset(799L);
        createFakeSegmentManager5.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertTrue(createFakeSegmentManager5._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager5._buildAndReplaceCalled);
        createFakeSegmentManager5.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager6 = createFakeSegmentManager();
        createFakeSegmentManager6._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager6._state.set(createFakeSegmentManager6, LLRealtimeSegmentDataManager.State.CATCHING_UP);
        createFakeSegmentManager6.setCurrentOffset(799L);
        createFakeSegmentManager6.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertTrue(createFakeSegmentManager6._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager6._buildAndReplaceCalled);
        createFakeSegmentManager6.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager7 = createFakeSegmentManager();
        createFakeSegmentManager7._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager7._state.set(createFakeSegmentManager7, LLRealtimeSegmentDataManager.State.CATCHING_UP);
        createFakeSegmentManager7._consumeOffsets.add(new LongMsgOffset(797L));
        createFakeSegmentManager7.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertTrue(createFakeSegmentManager7._downloadAndReplaceCalled);
        Assert.assertFalse(createFakeSegmentManager7._buildAndReplaceCalled);
        createFakeSegmentManager7.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager8 = createFakeSegmentManager();
        createFakeSegmentManager8._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager8._state.set(createFakeSegmentManager8, LLRealtimeSegmentDataManager.State.CATCHING_UP);
        createFakeSegmentManager8._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager8.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertFalse(createFakeSegmentManager8._downloadAndReplaceCalled);
        Assert.assertTrue(createFakeSegmentManager8._buildAndReplaceCalled);
        createFakeSegmentManager8.destroy();
    }

    @Test
    public void testEndCriteriaChecking() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        createFakeSegmentManager._state.set(createFakeSegmentManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
        Assert.assertFalse(createFakeSegmentManager.invokeEndCriteriaReached());
        createFakeSegmentManager.setNumRowsIndexed(249999);
        Assert.assertFalse(createFakeSegmentManager.invokeEndCriteriaReached());
        createFakeSegmentManager.setNumRowsIndexed(250000);
        Assert.assertTrue(createFakeSegmentManager.invokeEndCriteriaReached());
        Assert.assertEquals(createFakeSegmentManager.getStopReason(), "rowLimit");
        createFakeSegmentManager.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager2 = createFakeSegmentManager();
        createFakeSegmentManager2._state.set(createFakeSegmentManager2, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
        Assert.assertFalse(createFakeSegmentManager2.invokeEndCriteriaReached());
        createFakeSegmentManager2._timeSupplier.add(64368001L);
        Assert.assertFalse(createFakeSegmentManager2.invokeEndCriteriaReached());
        setHasMessagesFetched(createFakeSegmentManager2, true);
        createFakeSegmentManager2._timeSupplier.add(TimeUnit.HOURS.toMillis(1L));
        Assert.assertTrue(createFakeSegmentManager2.invokeEndCriteriaReached());
        Assert.assertEquals(createFakeSegmentManager2.getStopReason(), "timeLimit");
        createFakeSegmentManager2.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager3 = createFakeSegmentManager();
        createFakeSegmentManager3._state.set(createFakeSegmentManager3, LLRealtimeSegmentDataManager.State.CATCHING_UP);
        createFakeSegmentManager3.setFinalOffset(298L);
        createFakeSegmentManager3.setCurrentOffset(297L);
        Assert.assertFalse(createFakeSegmentManager3.invokeEndCriteriaReached());
        createFakeSegmentManager3.setCurrentOffset(298L);
        Assert.assertTrue(createFakeSegmentManager3.invokeEndCriteriaReached());
        createFakeSegmentManager3.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager4 = createFakeSegmentManager();
        createFakeSegmentManager4._timeSupplier.add(64368000L);
        createFakeSegmentManager4._state.set(createFakeSegmentManager4, LLRealtimeSegmentDataManager.State.CATCHING_UP);
        createFakeSegmentManager4.setFinalOffset(298L);
        createFakeSegmentManager4.setCurrentOffset(297L);
        Assert.assertFalse(createFakeSegmentManager4.invokeEndCriteriaReached());
        createFakeSegmentManager4.setCurrentOffset(298L);
        Assert.assertTrue(createFakeSegmentManager4.invokeEndCriteriaReached());
        createFakeSegmentManager4.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager5 = createFakeSegmentManager();
        createFakeSegmentManager5._timeSupplier.add(1L);
        createFakeSegmentManager5._state.set(createFakeSegmentManager5, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
        createFakeSegmentManager5.setConsumeEndTime(createFakeSegmentManager5._timeSupplier.get().longValue() + 10);
        createFakeSegmentManager5.setFinalOffset(298L);
        createFakeSegmentManager5.setCurrentOffset(297L);
        Assert.assertFalse(createFakeSegmentManager5.invokeEndCriteriaReached());
        createFakeSegmentManager5.setCurrentOffset(298L);
        Assert.assertTrue(createFakeSegmentManager5.invokeEndCriteriaReached());
        createFakeSegmentManager5.destroy();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager6 = createFakeSegmentManager();
        createFakeSegmentManager6._state.set(createFakeSegmentManager6, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
        long longValue = createFakeSegmentManager6._timeSupplier.get().longValue() + 10;
        createFakeSegmentManager6.setConsumeEndTime(longValue);
        createFakeSegmentManager6.setFinalOffset(298L);
        createFakeSegmentManager6.setCurrentOffset(297L);
        createFakeSegmentManager6._timeSupplier.set(longValue - 1);
        Assert.assertFalse(createFakeSegmentManager6.invokeEndCriteriaReached());
        createFakeSegmentManager6._timeSupplier.set(longValue);
        Assert.assertTrue(createFakeSegmentManager6.invokeEndCriteriaReached());
        createFakeSegmentManager6.destroy();
    }

    private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager fakeLLRealtimeSegmentDataManager, boolean z) throws Exception {
        Field declaredField = LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
        declaredField.setAccessible(true);
        declaredField.set(fakeLLRealtimeSegmentDataManager, Boolean.valueOf(z));
    }

    @Test
    public void testReuseOfBuiltSegment() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params();
        params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(params);
        params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(params));
        createFakeSegmentManager._responses.add(response);
        File segmentTarFile = createFakeSegmentManager.invokeBuildForCommit(50000L).getSegmentTarFile();
        Assert.assertNotNull(segmentTarFile);
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager.invokeCommit());
        Assert.assertTrue(segmentTarFile.exists());
        createFakeSegmentManager._buildSegmentCalled = false;
        File segmentTarFile2 = createFakeSegmentManager.invokeBuildForCommit(50000L).getSegmentTarFile();
        Assert.assertFalse(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertEquals(segmentTarFile2, segmentTarFile);
        Assert.assertTrue(segmentTarFile.exists());
        Assert.assertTrue(createFakeSegmentManager.invokeCommit());
        Assert.assertFalse(segmentTarFile.exists());
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testFileRemovedDuringOnlineTransition() throws Exception {
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params();
        params.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(params));
        createFakeSegmentManager.setCurrentOffset(798L);
        File segmentTarFile = createFakeSegmentManager.invokeBuildForCommit(50000L).getSegmentTarFile();
        Assert.assertNotNull(segmentTarFile);
        Assert.assertTrue(createFakeSegmentManager._buildSegmentCalled);
        Assert.assertFalse(createFakeSegmentManager.invokeCommit());
        Assert.assertTrue(segmentTarFile.exists());
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT_NAME_STR);
        segmentZKMetadata.setEndOffset(new LongMsgOffset(798L).toString());
        createFakeSegmentManager._stopWaitTimeMs = PARTITION_GROUP_ID;
        createFakeSegmentManager._state.set(createFakeSegmentManager, LLRealtimeSegmentDataManager.State.HOLDING);
        createFakeSegmentManager.goOnlineFromConsuming(segmentZKMetadata);
        Assert.assertFalse(segmentTarFile.exists());
        createFakeSegmentManager.destroy();
    }

    @Test
    public void testOnlyOneSegmentHoldingTheSemaphoreForParticularPartition() throws Exception {
        long j = 10000;
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager();
        Assert.assertTrue(createFakeSegmentManager.getAcquiredConsumerSemaphore().get());
        Semaphore partitionGroupConsumerSemaphore = createFakeSegmentManager.getPartitionGroupConsumerSemaphore();
        Assert.assertEquals(partitionGroupConsumerSemaphore.availablePermits(), PARTITION_GROUP_ID);
        Assert.assertFalse(partitionGroupConsumerSemaphore.hasQueuedThreads());
        AtomicReference atomicReference = new AtomicReference(null);
        new Thread(() -> {
            try {
                atomicReference.set(createFakeSegmentManager());
            } catch (Exception e) {
                throw new RuntimeException("Exception when sleeping for " + j + "ms", e);
            }
        }).start();
        TestUtils.waitForCondition(r4 -> {
            if (!partitionGroupConsumerSemaphore.hasQueuedThreads()) {
                return false;
            }
            createFakeSegmentManager.destroy();
            return true;
        }, 10000L, "Failed to wait for the second segment blocked on semaphore");
        TestUtils.waitForCondition(r3 -> {
            return Boolean.valueOf(atomicReference.get() != null);
        }, 10000L, "Failed to acquire the semaphore for the second segment manager in " + 10000 + "ms");
        Assert.assertTrue(((FakeLLRealtimeSegmentDataManager) atomicReference.get()).getAcquiredConsumerSemaphore().get());
        Semaphore partitionGroupConsumerSemaphore2 = ((FakeLLRealtimeSegmentDataManager) atomicReference.get()).getPartitionGroupConsumerSemaphore();
        Assert.assertEquals(partitionGroupConsumerSemaphore, partitionGroupConsumerSemaphore2);
        Assert.assertEquals(partitionGroupConsumerSemaphore2.availablePermits(), PARTITION_GROUP_ID);
        Assert.assertFalse(partitionGroupConsumerSemaphore2.hasQueuedThreads());
        createFakeSegmentManager.destroy();
        Assert.assertEquals(createFakeSegmentManager.getPartitionGroupConsumerSemaphore().availablePermits(), PARTITION_GROUP_ID);
        ((FakeLLRealtimeSegmentDataManager) atomicReference.get()).destroy();
        Assert.assertEquals(((FakeLLRealtimeSegmentDataManager) atomicReference.get()).getPartitionGroupConsumerSemaphore().availablePermits(), 1);
    }

    @Test
    public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor() throws Exception {
        TableConfig createTableConfig = createTableConfig();
        createTableConfig.setUpsertConfig((UpsertConfig) null);
        ZkHelixPropertyStore zkHelixPropertyStore = (ZkHelixPropertyStore) Mockito.mock(ZkHelixPropertyStore.class);
        Mockito.when(zkHelixPropertyStore.get(Mockito.anyString(), (Stat) ArgumentMatchers.any(), ArgumentMatchers.anyInt())).thenReturn(TableConfigUtils.toZNRecord(createTableConfig));
        TableDataManagerConfig tableDataManagerConfig = (TableDataManagerConfig) Mockito.mock(TableDataManagerConfig.class);
        Mockito.when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("REALTIME");
        Mockito.when(tableDataManagerConfig.getTableName()).thenReturn(createTableConfig.getTableName());
        Mockito.when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
        InstanceDataManagerConfig instanceDataManagerConfig = (InstanceDataManagerConfig) Mockito.mock(InstanceDataManagerConfig.class);
        Mockito.when(Integer.valueOf(instanceDataManagerConfig.getMaxParallelSegmentBuilds())).thenReturn(4);
        Mockito.when(Long.valueOf(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit())).thenReturn(-1L);
        Mockito.when(Integer.valueOf(instanceDataManagerConfig.getMaxParallelSegmentDownloads())).thenReturn(-1);
        Mockito.when(Boolean.valueOf(instanceDataManagerConfig.isStreamSegmentDownloadUntar())).thenReturn(false);
        TableDataManagerProvider.init(instanceDataManagerConfig);
        TableDataManager tableDataManager = TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, "testInstance", zkHelixPropertyStore, (ServerMetrics) Mockito.mock(ServerMetrics.class), (HelixManager) Mockito.mock(HelixManager.class), (LoadingCache) null);
        tableDataManager.start();
        tableDataManager.shutDown();
        Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
    }

    @Test
    public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached() throws Exception {
        TimeSupplier timeSupplier = new TimeSupplier() { // from class: org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManagerTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManagerTest.TimeSupplier, java.util.function.Supplier
            public Long get() {
                long currentTimeMillis = System.currentTimeMillis();
                return this._timeCheckCounter.incrementAndGet() <= 504 ? Long.valueOf(currentTimeMillis) : Long.valueOf(currentTimeMillis + TimeUnit.MINUTES.toMillis(11L));
            }
        };
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager(true, timeSupplier, String.valueOf(1000), "10m", null);
        createFakeSegmentManager._stubConsumeLoop = false;
        createFakeSegmentManager._state.set(createFakeSegmentManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT).withStreamPartitionMsgOffset(longMsgOffset.toString())));
        createPartitionConsumer.run();
        try {
            Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 508);
            Assert.assertEquals(createFakeSegmentManager.getCurrentOffset().getOffset(), 698L);
            Assert.assertEquals(createFakeSegmentManager.getSegment().getNumDocsIndexed(), 500);
            Assert.assertEquals(createFakeSegmentManager.getSegment().getSegmentMetadata().getTotalDocs(), 500);
            createFakeSegmentManager.destroy();
        } catch (Throwable th) {
            createFakeSegmentManager.destroy();
            throw th;
        }
    }

    @Test
    public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached() throws Exception {
        TimeSupplier timeSupplier = new TimeSupplier();
        FakeLLRealtimeSegmentDataManager createFakeSegmentManager = createFakeSegmentManager(true, timeSupplier, String.valueOf(500), "10m", null);
        createFakeSegmentManager._stubConsumeLoop = false;
        createFakeSegmentManager._state.set(createFakeSegmentManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
        LLRealtimeSegmentDataManager.PartitionConsumer createPartitionConsumer = createFakeSegmentManager.createPartitionConsumer();
        LongMsgOffset longMsgOffset = new LongMsgOffset(698L);
        createFakeSegmentManager._consumeOffsets.add(longMsgOffset);
        createFakeSegmentManager._responses.add(new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT).withStreamPartitionMsgOffset(longMsgOffset.toString())));
        createPartitionConsumer.run();
        try {
            Assert.assertEquals(timeSupplier._timeCheckCounter.get(), 506);
            Assert.assertEquals(createFakeSegmentManager.getCurrentOffset().getOffset(), 698L);
            Assert.assertEquals(createFakeSegmentManager.getSegment().getNumDocsIndexed(), 500);
            Assert.assertEquals(createFakeSegmentManager.getSegment().getSegmentMetadata().getTotalDocs(), 500);
            createFakeSegmentManager.destroy();
        } catch (Throwable th) {
            createFakeSegmentManager.destroy();
            throw th;
        }
    }
}
