package org.apache.pinot.broker.broker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/broker/HelixBrokerStarterTest.class */
public class HelixBrokerStarterTest extends ControllerTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
    private static final int NUM_BROKERS = 3;
    private static final int NUM_SERVERS = 1;
    private static final int NUM_OFFLINE_SEGMENTS = 5;
    private static final int EXPECTED_VERSION = -1;
    private HelixBrokerStarter _brokerStarter;

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        HashMap hashMap = new HashMap();
        hashMap.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 18099);
        hashMap.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
        hashMap.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
        hashMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, true);
        hashMap.put(CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
        this._brokerStarter = new HelixBrokerStarter();
        this._brokerStarter.init(new PinotConfiguration(hashMap));
        this._brokerStarter.start();
        addFakeBrokerInstancesToAutoJoinHelixCluster(2, true);
        addFakeServerInstancesToAutoJoinHelixCluster(1, true);
        this._helixResourceManager.addSchema(new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT, "EPOCH|DAYS", "1:DAYS").build(), true);
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).build());
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build());
        for (int i = 0; i < 5; i++) {
            this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        }
        TestUtils.waitForCondition(r5 -> {
            ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME);
            return Boolean.valueOf(resourceExternalView != null && resourceExternalView.getPartitionSet().size() == 5);
        }, 30000L, "Failed to find all OFFLINE segments in the ExternalView");
    }

    private Map<String, String> getStreamConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put(StreamConfigProperties.STREAM_TYPE, KafkaStreamConfigProperties.STREAM_TYPE);
        hashMap.put("stream.kafka.consumer.type", "highLevel");
        hashMap.put("stream.kafka.topic.name", "kafkaTopic");
        hashMap.put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
        return hashMap;
    }

    @Test
    public void testClusterConfigOverride() {
        PinotConfiguration config = this._brokerStarter.getConfig();
        Assert.assertTrue(config.getProperty(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, false));
        Assert.assertEquals(config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, 0), 12);
        Assert.assertTrue(config.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false));
    }

    @Test
    public void testResourceAndTagAssignment() throws Exception {
        Assert.assertEquals(this._helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(null)).size(), 3);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), 3);
        Assert.assertEquals(resourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), 3);
        ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), 3);
        Assert.assertEquals(resourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), 3);
        BrokerRoutingManager routingManager = this._brokerStarter.getRoutingManager();
        Assert.assertTrue(routingManager.routingExists(OFFLINE_TABLE_NAME));
        Assert.assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME));
        BrokerRequest compileToBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + OFFLINE_TABLE_NAME);
        RoutingTable routingTable = routingManager.getRoutingTable(compileToBrokerRequest);
        Assert.assertNotNull(routingTable);
        Assert.assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), 1);
        Assert.assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), 5);
        Assert.assertTrue(routingTable.getUnavailableSegments().isEmpty());
        this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        TestUtils.waitForCondition(r5 -> {
            return Boolean.valueOf(routingManager.getRoutingTable(compileToBrokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size() == 6);
        }, 30000L, "Failed to add the new segment into the routing table");
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType("newTable");
        try {
            this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName("newTable").setBrokerTenant("testBroker").build());
            Assert.fail("Table creation should fail as testBroker does not exist");
        } catch (InvalidTableConfigException e) {
        }
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName("newTable").setServerTenant("DefaultTenant").build());
        TableConfig tableConfig = this._helixResourceManager.getTableConfig(tableNameWithType);
        Assert.assertNotNull(tableConfig);
        Assert.assertEquals(tableConfig.getTenantConfig().getBroker(), "DefaultTenant");
        Assert.assertEquals(this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource").getInstanceSet(tableNameWithType).size(), 3);
        TestUtils.waitForCondition(r6 -> {
            Map<String, String> stateMap = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource").getStateMap(tableNameWithType);
            return Boolean.valueOf(stateMap != null && stateMap.size() == 3);
        }, 30000L, "Failed to find all brokers for the new table in the brokerResource ExternalView");
        Assert.assertTrue(routingManager.routingExists(tableNameWithType));
    }

    @Test
    public void testTimeBoundaryUpdate() {
        BrokerRoutingManager routingManager = this._brokerStarter.getRoutingManager();
        TimeBoundaryInfo timeBoundaryInfo = routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME);
        Assert.assertNotNull(timeBoundaryInfo);
        Assert.assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(10 - 1));
        String str = this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true).get(0);
        int i = 10 + 10;
        this._helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, str, i), this._helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, str), -1, "downloadUrl");
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue().equals(Integer.toString(i - 1)));
        }, 30000L, "Failed to update the time boundary for refreshed segment");
    }

    @AfterClass
    public void tearDown() {
        stopFakeInstances();
        this._brokerStarter.stop();
        stopController();
        stopZk();
    }
}
