package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.base.Preconditions;
import java.lang.reflect.Field;
import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.zookeeper.data.Stat;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.class */
public class SegmentCompletionTest {
    private static final String S_1 = "S1";
    private static final String S_2 = "S2";
    private static final String S_3 = "S3";
    private MockPinotLLCRealtimeSegmentManager _segmentManager;
    private MockSegmentCompletionManager _segmentCompletionMgr;
    private Map<String, Object> _fsmMap;
    private Map<String, Long> _commitTimeMap;
    private String _segmentNameStr;
    private final String _tableName = "someTable";
    private final LongMsgOffset _s1Offset = new LongMsgOffset(20);
    private final LongMsgOffset _s2Offset = new LongMsgOffset(40);
    private final LongMsgOffset _s3Offset = new LongMsgOffset(30);

    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest$MockPinotLLCRealtimeSegmentManager.class */
    public static class MockPinotLLCRealtimeSegmentManager extends PinotLLCRealtimeSegmentManager {
        public SegmentZKMetadata _segmentMetadata;
        public MockSegmentCompletionManager _segmentCompletionMgr;
        private static final ControllerConf CONTROLLER_CONF = new ControllerConf();
        public LLCSegmentName _stoppedSegmentName;
        public String _stoppedInstance;
        public HelixManager _helixManager;

        protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager) {
            this(pinotHelixResourceManager, new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()));
        }

        protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
            super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics);
            this._helixManager = (HelixManager) Mockito.mock(HelixManager.class);
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager
        public SegmentZKMetadata getSegmentZKMetadata(String str, String str2, Stat stat) {
            return this._segmentMetadata;
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager
        public void commitSegmentMetadata(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
            this._segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
            this._segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
            this._segmentMetadata.setDownloadUrl(committingSegmentDescriptor.getSegmentLocation());
            this._segmentMetadata.setEndTime(this._segmentCompletionMgr.getCurrentTimeMs());
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager
        public void commitSegmentFile(String str, CommittingSegmentDescriptor committingSegmentDescriptor) {
            Preconditions.checkState(!committingSegmentDescriptor.getSegmentLocation().equals("doNotCommitMe"));
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager
        public void segmentStoppedConsuming(LLCSegmentName lLCSegmentName, String str) {
            this._stoppedSegmentName = lLCSegmentName;
            this._stoppedInstance = str;
        }
    }

    /* loaded from: input_file:org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest$MockSegmentCompletionManager.class */
    public static class MockSegmentCompletionManager extends SegmentCompletionManager {
        public long _seconds;
        private boolean _isLeader;

        protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, boolean z, boolean z2) {
            this(SegmentCompletionTest.createMockHelixManager(z, z2), pinotLLCRealtimeSegmentManager, z);
        }

        protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, boolean z) {
            this(helixManager, pinotLLCRealtimeSegmentManager, z, new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()));
        }

        protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager, boolean z, ControllerMetrics controllerMetrics) {
            super(helixManager, pinotLLCRealtimeSegmentManager, controllerMetrics, new LeadControllerManager("localhost_1234", helixManager, controllerMetrics), SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
            this._isLeader = z;
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager
        protected StreamPartitionMsgOffsetFactory getStreamPartitionMsgOffsetFactory(LLCSegmentName lLCSegmentName) {
            return new LongMsgOffsetFactory();
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager
        protected long getCurrentTimeMs() {
            return this._seconds * 1000;
        }

        @Override // org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager
        protected boolean isLeader(String str) {
            return this._isLeader;
        }
    }

    @BeforeMethod
    public void testCaseSetup() throws Exception {
        testCaseSetup(true, true);
    }

    private StreamPartitionMsgOffset getModifiedLongOffset(LongMsgOffset longMsgOffset, long j) {
        return new LongMsgOffset(longMsgOffset.getOffset() + j);
    }

    private void verifyOffset(SegmentCompletionProtocol.Response response, StreamPartitionMsgOffset streamPartitionMsgOffset) {
        LongMsgOffset longMsgOffset = new LongMsgOffset(response.getStreamPartitionMsgOffset());
        Assert.assertEquals(longMsgOffset.compareTo((LongMsgOffset) streamPartitionMsgOffset), 0);
        Assert.assertEquals(longMsgOffset.compareTo((LongMsgOffset) streamPartitionMsgOffset), 0);
    }

    public void testCaseSetup(boolean z, boolean z2) throws Exception {
        PinotHelixResourceManager pinotHelixResourceManager = (PinotHelixResourceManager) Mockito.mock(PinotHelixResourceManager.class);
        Mockito.when(pinotHelixResourceManager.getHelixZkManager()).thenReturn(createMockHelixManager(z, z2));
        this._segmentManager = new MockPinotLLCRealtimeSegmentManager(pinotHelixResourceManager);
        this._segmentNameStr = new LLCSegmentName("someTable", 23, 12, System.currentTimeMillis()).getSegmentName();
        SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(this._segmentNameStr);
        segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
        segmentZKMetadata.setNumReplicas(3);
        this._segmentManager._segmentMetadata = segmentZKMetadata;
        this._segmentCompletionMgr = new MockSegmentCompletionManager(this._segmentManager, z, z2);
        this._segmentManager._segmentCompletionMgr = this._segmentCompletionMgr;
        Field declaredField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
        declaredField.setAccessible(true);
        this._fsmMap = (Map) declaredField.get(this._segmentCompletionMgr);
        Field declaredField2 = SegmentCompletionManager.class.getDeclaredField("_commitTimeMap");
        declaredField2.setAccessible(true);
        this._commitTimeMap = (Map) declaredField2.get(this._segmentCompletionMgr);
    }

    private void replaceSegmentCompletionManager() throws Exception {
        long j = this._segmentCompletionMgr._seconds;
        this._segmentCompletionMgr = new MockSegmentCompletionManager((PinotLLCRealtimeSegmentManager) this._segmentManager, true, true);
        this._segmentCompletionMgr._seconds = j;
        Field declaredField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
        declaredField.setAccessible(true);
        this._fsmMap = (Map) declaredField.get(this._segmentCompletionMgr);
    }

    @Test
    public void testStoppedConsumeDuringCompletion() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentStoppedConsuming(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr).withReason("IAmLazy")).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertEquals(new LLCSegmentName(this._segmentNameStr), this._segmentManager._stoppedSegmentName);
        Assert.assertEquals(S_3, this._segmentManager._stoppedInstance);
        this._segmentManager._stoppedSegmentName = null;
        this._segmentManager._stoppedInstance = null;
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testStoppedConsumeBeforeHold() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentStoppedConsuming(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason("IAmLazy")).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertEquals(new LLCSegmentName(this._segmentNameStr), this._segmentManager._stoppedSegmentName);
        Assert.assertEquals(S_1, this._segmentManager._stoppedInstance);
        this._segmentManager._stoppedSegmentName = null;
        this._segmentManager._stoppedInstance = null;
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        verifyOffset(segmentConsumed2, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testHappyPathAfterStoppedConsuming() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentStoppedConsuming(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withReason("some reason")).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertEquals(new LLCSegmentName(this._segmentNameStr), this._segmentManager._stoppedSegmentName);
        Assert.assertEquals(S_2, this._segmentManager._stoppedInstance);
        this._segmentManager._stoppedSegmentName = null;
        this._segmentManager._stoppedInstance = null;
        testHappyPath(6L);
    }

    @Test
    public void testHappyPath() throws Exception {
        testHappyPath(5L);
    }

    @Test
    public void testHappyPathSplitCommitWithLocalFS() throws Exception {
        testHappyPathSplitCommit(5L, "/local/file", "http://null:null/segments/someTable/" + this._segmentNameStr);
    }

    @Test
    public void testHappyPathSplitCommitWithDeepstore() throws Exception {
        testHappyPathSplitCommit(5L, "fakefs:///segment1", "fakefs:///segment1");
    }

    @Test
    public void testHappyPathSplitCommitWithPeerDownloadScheme() throws Exception {
        testHappyPathSplitCommit(5L, "peer:///segment1", "peer:///segment1");
    }

    @Test
    public void testExceptionInConsumedMessage() {
        this._segmentManager._segmentMetadata = null;
        this._segmentCompletionMgr._seconds = 10L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
    }

    @Test
    public void testCommitSegmentFileFail() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed2, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation("doNotCommitMe");
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds += 5;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation2 = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation2, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation2)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    private void testHappyPathSplitCommit(long j, String str, String str2) throws Exception {
        this._segmentCompletionMgr._seconds = j;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed2, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(str);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertEquals(this._segmentManager.getSegmentZKMetadata(null, null, null).getDownloadUrl(), str2);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testCommitDifferentOffsetSplitCommit() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        verifyOffset(segmentConsumed, this._s1Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
    }

    public void testHappyPath(long j) throws Exception {
        this._segmentCompletionMgr._seconds = j;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testControllerNotConnected() throws Exception {
        testCaseSetup(true, false);
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER);
    }

    @Test
    public void testWinnerOnTimeLimit() throws Exception {
        this._segmentCompletionMgr._seconds = 10L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
    }

    @Test
    public void testWinnerOnRowLimit() throws Exception {
        this._segmentCompletionMgr._seconds = 10L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.KEEP);
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr).withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.DISCARD);
    }

    @Test
    public void testDelayedServerSplitCommit() throws Exception {
        testDelayedServer(true);
    }

    @Test
    public void testDelayedServer() throws Exception {
        testDelayedServer(false);
    }

    public void testDelayedServer(boolean z) throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds += 3;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Request.Params withSegmentName = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withSegmentName(this._segmentNameStr);
        withSegmentName.withStreamPartitionMsgOffset(getModifiedLongOffset(this._s2Offset, 10L).toString());
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(withSegmentName).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        this._segmentCompletionMgr._seconds += 5;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, z, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Request.Params withSegmentName2 = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withSegmentName(this._segmentNameStr);
        withSegmentName2.withStreamPartitionMsgOffset(getModifiedLongOffset(this._s2Offset, 10L).toString());
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(withSegmentName2).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.DISCARD);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testDeadServers() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds += 3600;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds += 4;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
    }

    @Test
    public void testCommitterFailure() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        this._segmentCompletionMgr._seconds += 3;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds += (SegmentCompletionProtocol.getMaxSegmentCommitTimeMs() * SegmentCompletionProtocol.MAX_HOLD_TIME_MS) / 1000;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds += 5;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testHappyPathSlowCommit() throws Exception {
        String tableName = new LLCSegmentName(this._segmentNameStr).getTableName();
        Assert.assertNull(this._commitTimeMap.get(tableName));
        this._segmentCompletionMgr._seconds = 1509242466L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        long buildTimeSeconds = segmentConsumed2.getBuildTimeSeconds();
        Assert.assertTrue(buildTimeSeconds > 0);
        this._segmentCompletionMgr._seconds = (1509242466 + buildTimeSeconds) - 1;
        Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds += 19;
        Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds += 15;
        SegmentCompletionProtocol.Request.Params withSegmentLocation = new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(withSegmentLocation).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
        Assert.assertEquals(this._commitTimeMap.get(tableName).longValue(), (this._segmentCompletionMgr._seconds - 1509242466) * 1000);
        this._segmentCompletionMgr._seconds += 55;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitEnd(withSegmentLocation, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(withSegmentLocation)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testFailedSlowCommit() throws Exception {
        String tableName = new LLCSegmentName(this._segmentNameStr).getTableName();
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        long buildTimeSeconds = segmentConsumed2.getBuildTimeSeconds();
        Assert.assertTrue(buildTimeSeconds > 0);
        this._segmentCompletionMgr._seconds = (5 + buildTimeSeconds) - 1;
        Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
        this._segmentCompletionMgr._seconds += 25;
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
        Assert.assertFalse(this._commitTimeMap.containsKey(tableName));
    }

    @Test
    public void testLeaseTooLong() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed2 = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed2.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        long buildTimeSeconds = segmentConsumed2.getBuildTimeSeconds();
        Assert.assertTrue(buildTimeSeconds > 0);
        this._segmentCompletionMgr._seconds = (5 + buildTimeSeconds) - 1;
        Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
        while (this._segmentCompletionMgr._seconds + 20 <= 5 + SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) {
            Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
            Assert.assertTrue(this._fsmMap.containsKey(this._segmentNameStr));
            this._segmentCompletionMgr._seconds += 20;
        }
        Assert.assertEquals(this._segmentCompletionMgr.extendBuildTime(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withExtraTimeSec(20)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
        Assert.assertFalse(this._fsmMap.containsKey(this._segmentNameStr));
    }

    @Test
    public void testControllerFailureDuringCommit() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        replaceSegmentCompletionManager();
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertTrue(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
    }

    @Test
    public void testControllerFailureDuringSplitCommit() throws Exception {
        this._segmentCompletionMgr._seconds = 5L;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        SegmentCompletionProtocol.Response segmentConsumed = this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s3Offset.toString()).withSegmentName(this._segmentNameStr));
        Assert.assertEquals(segmentConsumed.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        verifyOffset(segmentConsumed, this._s2Offset);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP);
        this._segmentCompletionMgr._seconds++;
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
        replaceSegmentCompletionManager();
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_3).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
        this._segmentCompletionMgr._seconds++;
        Assert.assertTrue(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(this._s2Offset.toString()).withSegmentName(this._segmentNameStr).withSegmentLocation(SegmentCompletionProtocol.PARAM_SEGMENT_LOCATION)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
    }

    @Test
    public void testNotLeader() throws Exception {
        testCaseSetup(false, true);
        new SegmentCompletionProtocol.Request.Params();
        Assert.assertEquals(this._segmentCompletionMgr.segmentConsumed(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER);
        Assert.assertEquals(this._segmentCompletionMgr.segmentCommitStart(new SegmentCompletionProtocol.Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(this._s1Offset.toString()).withSegmentName(this._segmentNameStr)).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER);
    }

    private static HelixManager createMockHelixManager(boolean z, boolean z2) {
        HelixManager helixManager = (HelixManager) Mockito.mock(HelixManager.class);
        Mockito.when(Boolean.valueOf(helixManager.isLeader())).thenReturn(Boolean.valueOf(z));
        Mockito.when(Boolean.valueOf(helixManager.isConnected())).thenReturn(Boolean.valueOf(z2));
        return helixManager;
    }
}
