package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
import org.apache.pinot.integration.tests.realtime.utils.PauselessRealtimeTestUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

/* loaded from: input_file:org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.class */
public abstract class BasePauselessRealtimeIngestionTest extends BaseClusterIntegrationTest {
    protected static final int NUM_REALTIME_SEGMENTS = 48;
    protected static final long DEFAULT_COUNT_STAR_RESULT = 115545;
    protected static final String DEFAULT_TABLE_NAME_2 = "mytable_2";
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 10000;
    protected List<File> _avroFiles;
    protected boolean _failureEnabled = false;
    private static final Logger LOGGER = LoggerFactory.getLogger(BasePauselessRealtimeIngestionTest.class);

    protected abstract String getFailurePoint();

    protected abstract int getExpectedSegmentsWithFailure();

    protected abstract int getExpectedZKMetadataWithFailure();

    protected abstract long getCountStarResultWithFailure();

    public BaseControllerStarter createControllerStarter() {
        return new FailureInjectingControllerStarter();
    }

    protected void overrideControllerConf(Map<String, Object> map) {
        map.put("controller.task.scheduler.enabled", true);
        map.put("controller.realtime.segment.deepStoreUploadRetryEnabled", true);
        map.put("controller.realtime.segment.validation.initialDelayInSeconds", 500);
    }

    protected void overrideBrokerConf(PinotConfiguration pinotConfiguration) {
        super.overrideBrokerConf(pinotConfiguration);
        pinotConfiguration.setProperty("pinot.broker.use.mse.to.fill.empty.response.schema", true);
    }

    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        try {
            LOGGER.info("Set segment.store.uri: {} for server with scheme: {}", this._controllerConfig.getDataDir(), new URI(this._controllerConfig.getDataDir()).getScheme());
            pinotConfiguration.setProperty("pinot.server.instance.segment.store.uri", "file:" + this._controllerConfig.getDataDir());
            pinotConfiguration.setProperty("pinot.server.instance.segment.upload.to.deep.store", "true");
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        setMaxSegmentCompletionTimeMillis();
        setupNonPauselessTable();
        injectFailure();
        setupPauselessTable();
        waitForAllDocsLoaded(600000L);
    }

    private void setupNonPauselessTable() throws Exception {
        this._avroFiles = unpackAvroData(this._tempDir);
        startKafka();
        pushAvroIntoKafka(this._avroFiles);
        Schema createSchema = createSchema();
        createSchema.setSchemaName(DEFAULT_TABLE_NAME_2);
        addSchema(createSchema);
        TableConfig createRealtimeTableConfig = createRealtimeTableConfig(this._avroFiles.get(0));
        createRealtimeTableConfig.setTableName(DEFAULT_TABLE_NAME_2);
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeValue("100000");
        addTableConfig(createRealtimeTableConfig);
        waitForDocsLoaded(600000L, true, createRealtimeTableConfig.getTableName());
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(PauselessRealtimeTestUtils.assertUrlPresent(this._helixResourceManager.getSegmentsZKMetadata(createRealtimeTableConfig.getTableName())));
        }, 1000L, 100000L, "Some segments still have missing url");
    }

    private void setupPauselessTable() throws Exception {
        Schema createSchema = createSchema();
        createSchema.setSchemaName("mytable");
        addSchema(createSchema);
        TableConfig createRealtimeTableConfig = createRealtimeTableConfig(this._avroFiles.get(0));
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeValue("100000");
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(createRealtimeTableConfig.getIndexingConfig().getStreamConfigs())));
        ingestionConfig.getStreamIngestionConfig().setPauselessConsumptionEnabled(true);
        createRealtimeTableConfig.getIndexingConfig().setStreamConfigs((Map) null);
        createRealtimeTableConfig.setIngestionConfig(ingestionConfig);
        addTableConfig(createRealtimeTableConfig);
    }

    private void setMaxSegmentCompletionTimeMillis() {
        PinotLLCRealtimeSegmentManager realtimeSegmentManager = this._helixResourceManager.getRealtimeSegmentManager();
        if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) {
            ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager).setMaxSegmentCompletionTimeoutMs(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
        }
    }

    protected void injectFailure() {
        PinotLLCRealtimeSegmentManager realtimeSegmentManager = this._helixResourceManager.getRealtimeSegmentManager();
        if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) {
            ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager).enableTestFault(getFailurePoint());
        }
        this._failureEnabled = true;
    }

    protected void disableFailure() {
        this._failureEnabled = false;
        PinotLLCRealtimeSegmentManager realtimeSegmentManager = this._helixResourceManager.getRealtimeSegmentManager();
        if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) {
            ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager).disableTestFault(getFailurePoint());
        }
    }

    @AfterClass
    public void tearDown() throws IOException {
        LOGGER.info("Tearing down...");
        dropRealtimeTable(getTableName());
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }

    protected long getCountStarResult() {
        return this._failureEnabled ? getCountStarResultWithFailure() : DEFAULT_COUNT_STAR_RESULT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runValidationAndVerify() throws Exception {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2);
        PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, getExpectedSegmentsWithFailure(), this._helixManager);
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType).size() == getExpectedZKMetadataWithFailure());
        }, 1000L, 100000L, "New Segment ZK Metadata not created");
        Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
        disableFailure();
        this._controllerStarter.getRealtimeSegmentValidationManager().run();
        waitForAllDocsLoaded(600000L);
        waitForDocsLoaded(600000L, true, tableNameWithType2);
        PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS, this._helixManager);
        PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType2, NUM_REALTIME_SEGMENTS, this._helixManager);
        TestUtils.waitForCondition(r52 -> {
            return Boolean.valueOf(PauselessRealtimeTestUtils.assertUrlPresent(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType)));
        }, 1000L, 100000L, "Some segments still have missing url");
        PauselessRealtimeTestUtils.compareZKMetadataForSegments(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType), this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testBasicSegmentAssignment() {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        PauselessRealtimeTestUtils.verifyIdealState(tableNameWithType, NUM_REALTIME_SEGMENTS, this._helixManager);
        Assert.assertTrue(PauselessConsumptionUtils.isPauselessEnabled(getRealtimeTableConfig()));
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(!hasSegmentsInStatus(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType), CommonConstants.Segment.Realtime.Status.COMMITTING));
        }, 1000L, 100000L, "Some segments have status COMMITTING");
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(PauselessRealtimeTestUtils.assertUrlPresent(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType)));
        }, 1000L, 100000L, "Some segments still have missing url");
    }

    private boolean hasSegmentsInStatus(List<SegmentZKMetadata> list, CommonConstants.Segment.Realtime.Status status) {
        Iterator<SegmentZKMetadata> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getStatus() == status) {
                return true;
            }
        }
        return false;
    }
}
