package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SegmentGenerationMinionRealtimeIngestionTest.class */
public class SegmentGenerationMinionRealtimeIngestionTest extends BaseClusterIntegrationTest {
    private TableConfig _realtimeTableConfig;
    private static final String REALTIME_TABLE_NAME = "mytable";
    private static final String REALTIME_TABLE_NAME_WITH_TYPE = "mytable_REALTIME";
    private static final String REALTIME_SCHEMA_NAME = "mytable";

    @BeforeTest
    public void setUp() throws Exception {
        TestUtils.ensureDirectoriesExistAndEmpty(new File[]{this._tempDir, this._segmentDir, this._tarDir});
        startZk();
        startController();
        startBroker();
        startServer();
        startKafka();
        startMinion();
        addSchemaWithCustomSchemaName(createSchema(), "mytable");
        this._realtimeTableConfig = createRealtimeTableConfig((File) unpackAvroData(this._tempDir).get(0));
        addTableConfig(this._realtimeTableConfig);
    }

    @AfterTest
    public void tearDown() {
        try {
            stopMinion();
            stopKafka();
            stopServer();
            stopBroker();
            stopController();
            stopZk();
        } finally {
            FileUtils.deleteQuietly(this._tempDir);
        }
    }

    private void addSchemaWithCustomSchemaName(Schema schema, String str) throws IOException {
        schema.setSchemaName(str);
        getControllerRequestClient().addSchema(schema);
    }

    @Test
    public void testAdhocIngestionIntoRealtimeTable() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("inputDirURI", this._tempDir.getAbsolutePath());
        hashMap.put("inputFormat", "avro");
        sendPostRequest(getControllerBaseApiUrl() + "/tasks/execute", JsonUtils.objectToString(new AdhocTaskConfig("SegmentGenerationAndPushTask", "mytable", (String) null, hashMap)), Collections.singletonMap("accept", "application/json"));
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(((long) getTotalDocs("mytable")) == 115545);
            } catch (Exception e) {
                return false;
            }
        }, 5000L, 600000L, "Failed to load 115545 documents", true);
        Assert.assertEquals(postQuery("SELECT COUNT(*) FROM mytable").get("numSegmentsQueried").asInt(), 14);
    }

    @Test
    public void testScheduledIngestionIntoRealtimeTable() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("inputDirURI", this._tempDir.getAbsolutePath());
        hashMap.put("inputFormat", "avro");
        TableTaskConfig tableTaskConfig = new TableTaskConfig(Collections.singletonMap("SegmentGenerationAndPushTask", hashMap));
        BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(List.of(hashMap), "APPEND", "DAILY");
        IngestionConfig ingestionConfig = new IngestionConfig();
        ingestionConfig.setBatchIngestionConfig(batchIngestionConfig);
        this._realtimeTableConfig.setIngestionConfig(ingestionConfig);
        this._realtimeTableConfig.setTaskConfig(tableTaskConfig);
        updateTableConfig(this._realtimeTableConfig);
        sendPostRequest(getControllerBaseApiUrl() + "/tasks/schedule?taskType=SegmentGenerationAndPushTask&tableName=mytable_REALTIME", null, Collections.singletonMap("accept", "application/json"));
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(((long) getTotalDocs("mytable")) == 115545);
            } catch (Exception e) {
                return false;
            }
        }, 5000L, 600000L, "Failed to load 115545 documents", true);
        Assert.assertEquals(postQuery("SELECT COUNT(*) FROM mytable").get("numSegmentsQueried").asInt(), 14);
    }

    private int getTotalDocs(String str) throws Exception {
        JsonNode jsonNode = postQuery("SELECT COUNT(*) FROM " + str).get("resultTable");
        if (jsonNode == null) {
            return 0;
        }
        return jsonNode.get("rows").get(0).get(0).asInt();
    }
}
