package org.apache.pinot.broker.broker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.api.exception.InvalidTableConfigException;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.ServerRouteInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/broker/broker/HelixBrokerStarterTest.class */
public class HelixBrokerStarterTest extends ControllerTest {
    private static final String RAW_TABLE_NAME = "testTable";
    private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
    private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
    private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
    private static final int NUM_BROKERS = 3;
    private static final int NUM_SERVERS = 1;
    private static final int NUM_OFFLINE_SEGMENTS = 5;
    private static final int EXPECTED_VERSION = -1;
    private HelixBrokerStarter _brokerStarter;

    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        this._helixManager.getConfigAccessor().set(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()).build(), "default.hyperloglog.log2m", Integer.toString(12));
        HashMap hashMap = new HashMap();
        hashMap.put("pinot.broker.client.queryPort", 18099);
        hashMap.put("pinot.cluster.name", getHelixClusterName());
        hashMap.put("pinot.zk.server", getZkUrl());
        hashMap.put("pinot.broker.enable.query.limit.override", true);
        hashMap.put("pinot.broker.delayShutdownTimeMs", 0);
        hashMap.put("pinot.broker.default.query.limit", 1000);
        this._brokerStarter = new HelixBrokerStarter();
        this._brokerStarter.init(new PinotConfiguration(hashMap));
        this._brokerStarter.start();
        addFakeBrokerInstancesToAutoJoinHelixCluster(2, true);
        addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
        this._helixResourceManager.addSchema(new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT, "EPOCH|DAYS", "1:DAYS").build(), true, false);
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).build());
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).setNumReplicas(NUM_SERVERS).build());
        for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i += NUM_SERVERS) {
            this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        }
        TestUtils.waitForCondition(r5 -> {
            ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME);
            return Boolean.valueOf((resourceExternalView == null || resourceExternalView.getPartitionSet().size() != NUM_OFFLINE_SEGMENTS || this._helixAdmin.getResourceExternalView(getHelixClusterName(), REALTIME_TABLE_NAME) == null) ? false : true);
        }, 30000L, "Failed to find all OFFLINE segments in the ExternalView");
    }

    private Map<String, String> getStreamConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put("streamType", "kafka");
        hashMap.put("stream.kafka.topic.name", "kafkaTopic");
        hashMap.put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
        hashMap.put("stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
        return hashMap;
    }

    @Test
    public void testClusterConfigOverride() {
        PinotConfiguration config = this._brokerStarter.getConfig();
        Assert.assertTrue(config.getProperty("enable.case.insensitive", false));
        Assert.assertEquals(config.getProperty("default.hyperloglog.log2m", 0), 12);
        Assert.assertTrue(config.getProperty("pinot.broker.enable.query.limit.override", false));
        Assert.assertEquals(config.getProperty("pinot.broker.default.query.limit", NUM_SERVERS), 1000);
    }

    @Test
    public void testResourceAndTagAssignment() throws Exception {
        Assert.assertEquals(this._helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant((String) null)).size(), NUM_BROKERS);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
        Assert.assertEquals(resourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
        ExternalView resourceExternalView = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource");
        Assert.assertEquals(resourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), NUM_BROKERS);
        Assert.assertEquals(resourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), NUM_BROKERS);
        BrokerRoutingManager routingManager = this._brokerStarter.getRoutingManager();
        Assert.assertTrue(routingManager.routingExists(OFFLINE_TABLE_NAME));
        Assert.assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME));
        BrokerRequest compileToBrokerRequest = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + OFFLINE_TABLE_NAME);
        RoutingTable routingTable = routingManager.getRoutingTable(compileToBrokerRequest, 0L);
        Assert.assertNotNull(routingTable);
        Assert.assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS);
        Assert.assertEquals(((ServerRouteInfo) routingTable.getServerInstanceToSegmentsMap().values().iterator().next()).getSegments().size(), NUM_OFFLINE_SEGMENTS);
        Assert.assertTrue(routingTable.getUnavailableSegments().isEmpty());
        this._helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
        TestUtils.waitForCondition(r7 -> {
            return Boolean.valueOf(((ServerRouteInfo) routingManager.getRoutingTable(compileToBrokerRequest, 0L).getServerInstanceToSegmentsMap().values().iterator().next()).getSegments().size() == 6);
        }, 30000L, "Failed to add the new segment into the routing table");
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType("newTable");
        try {
            this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName("newTable").setBrokerTenant("testBroker").build());
            Assert.fail("Table creation should fail as testBroker does not exist");
        } catch (InvalidTableConfigException e) {
        }
        addDummySchema("newTable");
        this._helixResourceManager.addTable(new TableConfigBuilder(TableType.OFFLINE).setTableName("newTable").setServerTenant("DefaultTenant").build());
        TableConfig tableConfig = this._helixResourceManager.getTableConfig(tableNameWithType);
        Assert.assertNotNull(tableConfig);
        Assert.assertEquals(tableConfig.getTenantConfig().getBroker(), "DefaultTenant");
        Assert.assertEquals(this._helixAdmin.getResourceIdealState(getHelixClusterName(), "brokerResource").getInstanceSet(tableNameWithType).size(), NUM_BROKERS);
        TestUtils.waitForCondition(r6 -> {
            Map stateMap = this._helixAdmin.getResourceExternalView(getHelixClusterName(), "brokerResource").getStateMap(tableNameWithType);
            return Boolean.valueOf(stateMap != null && stateMap.size() == NUM_BROKERS);
        }, 30000L, "Failed to find all brokers for the new table in the brokerResource ExternalView");
        Assert.assertTrue(routingManager.routingExists(tableNameWithType));
    }

    @Test
    public void testTimeBoundaryUpdate() {
        BrokerRoutingManager routingManager = this._brokerStarter.getRoutingManager();
        TimeBoundaryInfo timeBoundaryInfo = routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME);
        Assert.assertNotNull(timeBoundaryInfo);
        Assert.assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(10 - NUM_SERVERS));
        String str = (String) this._helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME, true).get(0);
        int i = 10 + 10;
        this._helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, str, i), this._helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, str), EXPECTED_VERSION, "downloadUrl");
        TestUtils.waitForCondition(r6 -> {
            return Boolean.valueOf(routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue().equals(Integer.toString(i - NUM_SERVERS)));
        }, 30000L, "Failed to update the time boundary for refreshed segment");
    }

    @AfterClass
    public void tearDown() {
        stopFakeInstances();
        this._brokerStarter.stop();
        stopController();
        stopZk();
    }
}
