package org.apache.pinot.integration.tests;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingControllerStarter;
import org.apache.pinot.integration.tests.realtime.utils.FailureInjectingPinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.class */
public class PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClusterIntegrationTest {
    private static final String DEFAULT_TABLE_NAME_2 = "mytable_2";
    private static final long MAX_SEGMENT_COMPLETION_TIME_MILLIS = 10000;

    protected void overrideControllerConf(Map<String, Object> map) {
        map.put("controller.task.scheduler.enabled", true);
        map.put("controller.realtime.segment.deepStoreUploadRetryEnabled", true);
        map.put("controller.realtime.segment.validation.initialDelayInSeconds", 500);
    }

    protected void overrideServerConf(PinotConfiguration pinotConfiguration) {
        pinotConfiguration.setProperty("pinot.server.instance.segment.store.uri", "file:" + this._controllerConfig.getDataDir());
        pinotConfiguration.setProperty("pinot.server.instance.segment.upload.to.deep.store", "true");
        pinotConfiguration.setProperty("pinot.server.instance.table.data.manager.provider.class", "org.apache.pinot.integration.tests.realtime.utils.FailureInjectingTableDataManagerProvider");
    }

    public BaseControllerStarter createControllerStarter() {
        return new FailureInjectingControllerStarter();
    }

    @BeforeClass
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        List unpackAvroData = unpackAvroData(this._tempDir);
        startKafka();
        pushAvroIntoKafka(unpackAvroData);
        setMaxSegmentCompletionTimeMillis();
        Schema createSchema = createSchema();
        createSchema.setSchemaName(DEFAULT_TABLE_NAME_2);
        addSchema(createSchema);
        TableConfig createRealtimeTableConfig = createRealtimeTableConfig((File) unpackAvroData.get(0));
        createRealtimeTableConfig.setTableName(DEFAULT_TABLE_NAME_2);
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
        createRealtimeTableConfig.getValidationConfig().setRetentionTimeValue("100000");
        addTableConfig(createRealtimeTableConfig);
        waitForDocsLoaded(600000L, true, createRealtimeTableConfig.getTableName());
        createSchema.setSchemaName("mytable");
        addSchema(createSchema);
        TableConfig createRealtimeTableConfig2 = createRealtimeTableConfig((File) unpackAvroData.get(0));
        createRealtimeTableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS");
        createRealtimeTableConfig2.getValidationConfig().setRetentionTimeValue("100000");
        IndexingConfig indexingConfig = createRealtimeTableConfig2.getIndexingConfig();
        Map streamConfigs = indexingConfig.getStreamConfigs();
        indexingConfig.setStreamConfigs((Map) null);
        Assert.assertNotNull(streamConfigs);
        streamConfigs.put("realtime.segment.pauseless.download.timeoutSeconds", "10");
        IngestionConfig ingestionConfig = new IngestionConfig();
        StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(List.of(streamConfigs));
        streamIngestionConfig.setPauselessConsumptionEnabled(true);
        ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
        createRealtimeTableConfig2.setIngestionConfig(ingestionConfig);
        addTableConfig(createRealtimeTableConfig2);
        String tableName = createRealtimeTableConfig2.getTableName();
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(getNumErrorSegmentsInEV(tableName) == 10);
        }, 600000L, "Segments still not in error state");
    }

    private void setMaxSegmentCompletionTimeMillis() {
        PinotLLCRealtimeSegmentManager realtimeSegmentManager = this._helixResourceManager.getRealtimeSegmentManager();
        if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) {
            ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager).setMaxSegmentCompletionTimeoutMs(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
        }
    }

    private int getNumErrorSegmentsInEV(String str) {
        ExternalView resourceExternalView = this._helixResourceManager.getHelixAdmin().getResourceExternalView(this._helixResourceManager.getHelixClusterName(), str);
        if (resourceExternalView == null) {
            return 0;
        }
        int i = 0;
        Iterator it = resourceExternalView.getRecord().getMapFields().values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Map) it.next()).values().iterator();
            while (it2.hasNext()) {
                if (((String) it2.next()).equals("ERROR")) {
                    i++;
                }
            }
        }
        return i;
    }

    @Test
    public void testSegmentAssignment() throws Exception {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        Assert.assertFalse(getErrorSegmentsInEV(tableNameWithType).isEmpty(), "No segments found in ERROR state, expected at least one.");
        Thread.sleep(MAX_SEGMENT_COMPLETION_TIME_MILLIS);
        this._controllerStarter.getRealtimeSegmentValidationManager().run();
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(getErrorSegmentsInEV(tableNameWithType).isEmpty());
        }, 600000L, "Some segments are still in ERROR state after resetSegments()");
        compareZKMetadataForSegments(this._helixResourceManager.getSegmentsZKMetadata(tableNameWithType), this._helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME_2)));
    }

    private List<String> getErrorSegmentsInEV(String str) {
        ExternalView resourceExternalView = this._helixResourceManager.getHelixAdmin().getResourceExternalView(this._helixResourceManager.getHelixClusterName(), str);
        if (resourceExternalView == null) {
            return List.of();
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : resourceExternalView.getRecord().getMapFields().entrySet()) {
            if (((Map) entry.getValue()).containsValue("ERROR")) {
                arrayList.add((String) entry.getKey());
            }
        }
        return arrayList;
    }

    private void compareZKMetadataForSegments(List<SegmentZKMetadata> list, List<SegmentZKMetadata> list2) {
        Map<String, SegmentZKMetadata> partitionSegmentNumberToMetadataMap = getPartitionSegmentNumberToMetadataMap(list);
        Map<String, SegmentZKMetadata> partitionSegmentNumberToMetadataMap2 = getPartitionSegmentNumberToMetadataMap(list2);
        partitionSegmentNumberToMetadataMap.forEach((str, segmentZKMetadata) -> {
            areSegmentZkMetadataSame(segmentZKMetadata, (SegmentZKMetadata) partitionSegmentNumberToMetadataMap2.get(str));
        });
    }

    private void areSegmentZkMetadataSame(SegmentZKMetadata segmentZKMetadata, SegmentZKMetadata segmentZKMetadata2) {
        if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) {
            return;
        }
        Assert.assertEquals(segmentZKMetadata.getStatus(), segmentZKMetadata2.getStatus());
        Assert.assertEquals(segmentZKMetadata.getStartOffset(), segmentZKMetadata2.getStartOffset());
        Assert.assertEquals(segmentZKMetadata.getEndOffset(), segmentZKMetadata2.getEndOffset());
        Assert.assertEquals(segmentZKMetadata.getTotalDocs(), segmentZKMetadata2.getTotalDocs());
        Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), segmentZKMetadata2.getStartTimeMs());
        Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata2.getEndTimeMs());
    }

    private Map<String, SegmentZKMetadata> getPartitionSegmentNumberToMetadataMap(List<SegmentZKMetadata> list) {
        HashMap hashMap = new HashMap();
        for (SegmentZKMetadata segmentZKMetadata : list) {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(segmentZKMetadata.getSegmentName());
            hashMap.put(lLCSegmentName.getPartitionGroupId() + "_" + lLCSegmentName.getSequenceNumber(), segmentZKMetadata);
        }
        return hashMap;
    }

    @AfterClass
    public void tearDown() throws IOException {
        dropRealtimeTable(getTableName());
        stopServer();
        stopBroker();
        stopController();
        stopKafka();
        stopZk();
        FileUtils.deleteDirectory(this._tempDir);
    }
}
