package org.apache.pinot.integration.tests;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/KafkaIncreaseDecreasePartitionsIntegrationTest.class */
public class KafkaIncreaseDecreasePartitionsIntegrationTest extends BaseRealtimeClusterIntegrationTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaIncreaseDecreasePartitionsIntegrationTest.class);
    private static final String KAFKA_TOPIC = "meetup";
    private static final int NUM_PARTITIONS = 1;

    String getExternalView(String str) throws IOException {
        return sendGetRequest(getControllerRequestURLBuilder().forExternalView(str));
    }

    void pauseTable(String str) throws IOException {
        sendPostRequest(getControllerRequestURLBuilder().forPauseConsumption(str));
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(((PauseStatusDetails) JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(str)), PauseStatusDetails.class)).getConsumingSegments().isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to pause table: " + str);
    }

    void resumeTable(String str) throws IOException {
        sendPostRequest(getControllerRequestURLBuilder().forResumeConsumption(str));
        TestUtils.waitForCondition(r6 -> {
            try {
                return Boolean.valueOf(!((PauseStatusDetails) JsonUtils.stringToObject(sendGetRequest(getControllerRequestURLBuilder().forPauseStatus(str)), PauseStatusDetails.class)).getConsumingSegments().isEmpty());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, 60000L, "Failed to resume table: " + str);
    }

    String createTable() throws IOException {
        addSchema(createSchema("simpleMeetup_schema.json"));
        TableConfig tableConfig = (TableConfig) JsonUtils.inputStreamToObject(getClass().getClassLoader().getResourceAsStream("simpleMeetup_realtime_table_config.json"), TableConfig.class);
        addTableConfig(tableConfig);
        return tableConfig.getTableName();
    }

    void waitForNumConsumingSegmentsInEV(String str, int i) {
        TestUtils.waitForCondition(r7 -> {
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                ((TableViews.TableView) JsonUtils.stringToObject(getExternalView(str), TableViews.TableView.class))._realtime.values().forEach(map -> {
                    atomicInteger.addAndGet((int) map.values().stream().filter(str2 -> {
                        return str2.equals("CONSUMING");
                    }).count());
                });
                return Boolean.valueOf(atomicInteger.get() == i);
            } catch (IOException e) {
                LOGGER.error("Exception in waitForNumConsumingSegments: {}", e.getMessage());
                return false;
            }
        }, 5000L, 300000L, "Failed to wait for " + i + " consuming segments for table: " + str);
    }

    @Test
    public void testDecreasePartitions() throws Exception {
        LOGGER.info("Starting testDecreasePartitions");
        LOGGER.info("Creating Kafka topic with {} partitions", 3);
        ((StreamDataServerStartable) this._kafkaStarters.get(0)).createTopic(KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(3));
        String createTable = createTable();
        waitForNumConsumingSegmentsInEV(createTable, 3);
        pauseTable(createTable);
        LOGGER.info("Deleting Kafka topic");
        ((StreamDataServerStartable) this._kafkaStarters.get(0)).deleteTopic(KAFKA_TOPIC);
        LOGGER.info("Creating Kafka topic with {} partitions", 1);
        ((StreamDataServerStartable) this._kafkaStarters.get(0)).createTopic(KAFKA_TOPIC, KafkaStarterUtils.getTopicCreationProps(1));
        resumeTable(createTable);
        waitForNumConsumingSegmentsInEV(createTable, 1);
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testDictionaryBasedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testGeneratedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testHardcodedQueries(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testInstanceShutdown() {
    }

    @Override // org.apache.pinot.integration.tests.BaseRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testQueriesFromQueryFile(boolean z) {
    }

    @Test(enabled = false)
    public void testQueryExceptions(boolean z) {
    }

    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testHardcodedServerPartitionedSqlQueries() {
    }
}
