package org.apache.pinot.broker.broker.helix;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener;
import org.apache.pinot.spi.eventlistener.query.PinotBrokerQueryEventListenerFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/broker/helix/BaseBrokerStarter.class */
public abstract class BaseBrokerStarter implements ServiceStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerStarter.class);
    protected PinotConfiguration _brokerConf;
    protected List<ListenerConfig> _listenerConfigs;
    protected String _clusterName;
    protected String _zkServers;
    protected String _hostname;
    protected int _port;
    protected int _tlsPort;
    protected String _instanceId;
    private volatile boolean _isStarting = false;
    private volatile boolean _isShuttingDown = false;
    protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _liveInstanceChangeHandlers = new ArrayList();
    protected HelixManager _spectatorHelixManager;
    protected HelixAdmin _helixAdmin;
    protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
    protected HelixDataAccessor _helixDataAccessor;
    protected PinotMetricsRegistry _metricsRegistry;
    protected BrokerMetrics _brokerMetrics;
    protected BrokerRoutingManager _routingManager;
    protected AccessControlFactory _accessControlFactory;
    protected BrokerRequestHandler _brokerRequestHandler;
    protected SqlQueryExecutor _sqlQueryExecutor;
    protected BrokerAdminApiApplication _brokerAdminApplication;
    protected ClusterChangeMediator _clusterChangeMediator;
    protected HelixManager _participantHelixManager;
    protected ServerRoutingStatsManager _serverRoutingStatsManager;
    protected BrokerQueryEventListener _brokerQueryEventListener;

    public void init(PinotConfiguration pinotConfiguration) throws Exception {
        this._brokerConf = pinotConfiguration;
        this._zkServers = pinotConfiguration.getProperty("pinot.zk.server").replaceAll("\\s+", "");
        this._clusterName = pinotConfiguration.getProperty("pinot.cluster.name");
        ServiceStartableUtils.applyClusterConfig(this._brokerConf, this._zkServers, this._clusterName, ServiceRole.BROKER);
        if (this._brokerConf.getProperty("pinot.query.runner.port", 0) == 0) {
            this._brokerConf.setProperty("pinot.query.runner.port", Integer.valueOf(NetUtils.findOpenPort()));
        }
        setupHelixSystemProperties();
        this._listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(pinotConfiguration);
        this._hostname = pinotConfiguration.getProperty("pinot.broker.hostname");
        if (this._hostname == null) {
            this._hostname = this._brokerConf.getProperty("pinot.set.instance.id.to.hostname", false) ? NetUtils.getHostnameOrAddress() : NetUtils.getHostAddress();
        }
        if (!this._brokerConf.containsKey("pinot.query.runner.hostname")) {
            this._brokerConf.setProperty("pinot.query.runner.hostname", this._hostname);
        }
        this._port = this._listenerConfigs.get(0).getPort();
        this._tlsPort = ListenerConfigUtil.findLastTlsPort(this._listenerConfigs, -1);
        this._instanceId = this._brokerConf.getProperty("pinot.broker.instance.id");
        if (this._instanceId == null) {
            this._instanceId = this._brokerConf.getProperty("instanceId");
        }
        if (this._instanceId == null) {
            this._instanceId = "Broker_" + this._hostname + "_" + this._port;
        }
        Preconditions.checkState(InstanceTypeUtils.isBroker(this._instanceId), "Instance id must have prefix '%s', got '%s'", "Broker_", this._instanceId);
        this._brokerConf.setProperty("pinot.broker.instance.id", this._instanceId);
    }

    private void setupHelixSystemProperties() {
        System.setProperty("helixmanager.flappingTimeWindow", this._brokerConf.getProperty("pinot.broker.flapping.timeWindowMs", "1"));
    }

    public int getPort() {
        return this._port;
    }

    public void addIdealStateChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._idealStateChangeHandlers.add(clusterChangeHandler);
    }

    public void addExternalViewChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._externalViewChangeHandlers.add(clusterChangeHandler);
    }

    public void addInstanceConfigChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._instanceConfigChangeHandlers.add(clusterChangeHandler);
    }

    public void addLiveInstanceChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._liveInstanceChangeHandlers.add(clusterChangeHandler);
    }

    public ServiceRole getServiceRole() {
        return ServiceRole.BROKER;
    }

    public String getInstanceId() {
        return this._instanceId;
    }

    public PinotConfiguration getConfig() {
        return this._brokerConf;
    }

    public void start() throws Exception {
        LOGGER.info("Starting Pinot broker (Version: {})", PinotVersion.VERSION);
        LOGGER.info("Broker configs: {}", new PinotAppConfigs(getConfig()).toJSONString());
        this._isStarting = true;
        Utils.logVersions();
        LOGGER.info("Connecting spectator Helix manager");
        this._spectatorHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._instanceId, InstanceType.SPECTATOR, this._zkServers);
        this._spectatorHelixManager.connect();
        this._helixAdmin = this._spectatorHelixManager.getClusterManagmentTool();
        this._propertyStore = this._spectatorHelixManager.getHelixPropertyStore();
        this._helixDataAccessor = this._spectatorHelixManager.getHelixDataAccessor();
        LOGGER.info("Setting up broker request handler");
        this._metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(this._brokerConf.subset("pinot.broker.metrics"));
        this._brokerMetrics = new BrokerMetrics(this._brokerConf.getProperty("pinot.broker.metrics.prefix", "pinot.broker."), this._metricsRegistry, this._brokerConf.getProperty("pinot.broker.enableTableLevelMetrics", true), this._brokerConf.getProperty("pinot.broker.allowedTablesForEmittingMetrics", Collections.emptyList()));
        this._brokerMetrics.initializeGlobalMeters();
        this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1L);
        this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ADAPTIVE_SERVER_SELECTOR_TYPE, this._brokerConf.getProperty("pinot.broker.adaptive.server.selector.type", CommonConstants.Broker.AdaptiveServerSelector.DEFAULT_TYPE), 1L);
        BrokerMetrics.register(this._brokerMetrics);
        this._serverRoutingStatsManager = new ServerRoutingStatsManager(this._brokerConf, this._brokerMetrics);
        this._serverRoutingStatsManager.init();
        this._routingManager = new BrokerRoutingManager(this._brokerMetrics, this._serverRoutingStatsManager, this._brokerConf);
        this._routingManager.init(this._spectatorHelixManager);
        PinotConfiguration subset = this._brokerConf.subset("pinot.broker.access.control");
        subset.setProperty("pinot.cluster.name", this._brokerConf.getProperty("pinot.cluster.name"));
        this._accessControlFactory = AccessControlFactory.loadFactory(subset, this._propertyStore);
        HelixExternalViewBasedQueryQuotaManager helixExternalViewBasedQueryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(this._brokerMetrics, this._instanceId);
        helixExternalViewBasedQueryQuotaManager.init(this._spectatorHelixManager);
        LOGGER.info("Initializing QueryRewriterFactory");
        QueryRewriterFactory.init(this._brokerConf.getProperty("pinot.broker.query.rewriter.class.names"));
        LOGGER.info("Initializing ResultRewriterFactory");
        ResultRewriterFactory.init(this._brokerConf.getProperty("pinot.broker.result.rewriter.class.names"));
        FunctionRegistry.init();
        TableCache tableCache = new TableCache(this._propertyStore, this._brokerConf.getProperty("enable.case.insensitive", true));
        TlsConfig extractTlsConfig = TlsUtils.extractTlsConfig(this._brokerConf, "pinot.broker.tls");
        NettyConfig extractNettyConfig = NettyConfig.extractNettyConfig(this._brokerConf, "pinot.broker.netty");
        LOGGER.info("Initializing Broker Event Listener Factory");
        this._brokerQueryEventListener = PinotBrokerQueryEventListenerFactory.getBrokerQueryEventListener(this._brokerConf.subset("pinot.broker.event.listener"));
        String property = this._brokerConf.getProperty("pinot.broker.instance.id", getDefaultBrokerId());
        BaseBrokerRequestHandler grpcBrokerRequestHandler = this._brokerConf.getProperty("pinot.broker.request.handler.type", "netty").equalsIgnoreCase("grpc") ? new GrpcBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, null, this._brokerQueryEventListener) : this._brokerConf.getProperty("pinot.broker.nettytls.enabled", false) ? new SingleConnectionBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, extractNettyConfig, extractTlsConfig, this._serverRoutingStatsManager, this._brokerQueryEventListener) : new SingleConnectionBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, extractNettyConfig, null, this._serverRoutingStatsManager, this._brokerQueryEventListener);
        MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
        if (this._brokerConf.getProperty("pinot.multistage.engine.enabled", true)) {
            multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, this._brokerQueryEventListener);
        }
        this._brokerRequestHandler = new BrokerRequestHandlerDelegate(property, grpcBrokerRequestHandler, multiStageBrokerRequestHandler, this._brokerMetrics);
        this._brokerRequestHandler.start();
        ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(this._brokerConf.getProperty("pinot.broker.instance.enableThreadCpuTimeMeasurement", false));
        ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(this._brokerConf.getProperty("pinot.broker.instance.enableThreadAllocatedBytesMeasurement", false));
        Tracing.ThreadAccountantOps.initializeThreadAccountant(this._brokerConf.subset("pinot.query.scheduler"), this._instanceId);
        String property2 = this._brokerConf.getProperty("pinot.broker.controller.url");
        if (property2 != null) {
            this._sqlQueryExecutor = new SqlQueryExecutor(property2);
        } else {
            this._sqlQueryExecutor = new SqlQueryExecutor(this._spectatorHelixManager);
        }
        LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(this._listenerConfigs));
        this._brokerAdminApplication = new BrokerAdminApiApplication(this._routingManager, this._brokerRequestHandler, this._brokerMetrics, this._brokerConf, this._sqlQueryExecutor, this._serverRoutingStatsManager, this._accessControlFactory, this._spectatorHelixManager);
        registerExtraComponents(this._brokerAdminApplication);
        this._brokerAdminApplication.start(this._listenerConfigs);
        LOGGER.info("Initializing cluster change mediator");
        Iterator<ClusterChangeHandler> it = this._idealStateChangeHandlers.iterator();
        while (it.hasNext()) {
            it.next().init(this._spectatorHelixManager);
        }
        this._idealStateChangeHandlers.add(this._routingManager);
        Iterator<ClusterChangeHandler> it2 = this._externalViewChangeHandlers.iterator();
        while (it2.hasNext()) {
            it2.next().init(this._spectatorHelixManager);
        }
        this._externalViewChangeHandlers.add(this._routingManager);
        this._externalViewChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it3 = this._instanceConfigChangeHandlers.iterator();
        while (it3.hasNext()) {
            it3.next().init(this._spectatorHelixManager);
        }
        this._instanceConfigChangeHandlers.add(this._routingManager);
        this._instanceConfigChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it4 = this._liveInstanceChangeHandlers.iterator();
        while (it4.hasNext()) {
            it4.next().init(this._spectatorHelixManager);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(HelixConstants.ChangeType.IDEAL_STATE, this._idealStateChangeHandlers);
        hashMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, this._externalViewChangeHandlers);
        hashMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, this._instanceConfigChangeHandlers);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            hashMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, this._liveInstanceChangeHandlers);
        }
        this._clusterChangeMediator = new ClusterChangeMediator(hashMap, this._brokerMetrics);
        this._clusterChangeMediator.start();
        this._spectatorHelixManager.addIdealStateChangeListener(this._clusterChangeMediator);
        this._spectatorHelixManager.addExternalViewChangeListener(this._clusterChangeMediator);
        this._spectatorHelixManager.addInstanceConfigChangeListener(this._clusterChangeMediator);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            this._spectatorHelixManager.addLiveInstanceChangeListener(this._clusterChangeMediator);
        }
        LOGGER.info("Connecting participant Helix manager");
        this._participantHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._instanceId, InstanceType.PARTICIPANT, this._zkServers);
        this._participantHelixManager.getStateMachineEngine().registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), new BrokerResourceOnlineOfflineStateModelFactory(this._propertyStore, this._helixDataAccessor, this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new BrokerUserDefinedMessageHandlerFactory(this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.connect();
        updateInstanceConfigAndBrokerResourceIfNeeded();
        this._brokerMetrics.addCallbackGauge("helix.connected", () -> {
            return Long.valueOf(this._participantHelixManager.isConnected() ? 1L : 0L);
        });
        this._participantHelixManager.addPreConnectCallback(() -> {
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L);
        });
        registerServiceStatusHandler();
        this._isStarting = false;
        LOGGER.info("Finish starting Pinot broker");
    }

    protected void registerExtraComponents(BrokerAdminApiApplication brokerAdminApiApplication) {
    }

    private void updateInstanceConfigAndBrokerResourceIfNeeded() {
        InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(this._participantHelixManager, this._instanceId);
        boolean updateHostnamePort = HelixHelper.updateHostnamePort(instanceConfig, this._hostname, this._port);
        Map<String, String> simpleFields = instanceConfig.getRecord().getSimpleFields();
        if (this._tlsPort > 0) {
            HelixHelper.updateTlsPort(instanceConfig, this._tlsPort);
        }
        boolean updatePortIfNeeded = (this._brokerConf.getProperty("pinot.multistage.engine.enabled", true) ? updateHostnamePort | updatePortIfNeeded(simpleFields, "queryMailboxPort", Integer.parseInt(this._brokerConf.getProperty("pinot.query.runner.port"))) : updateHostnamePort | updatePortIfNeeded(simpleFields, "queryMailboxPort", -1)) | HelixHelper.removeDisabledPartitions(instanceConfig);
        boolean z = false;
        List tags = instanceConfig.getTags();
        if (tags.isEmpty()) {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled(this._propertyStore)) {
                instanceConfig.addTag(TagNameUtils.getBrokerTagForTenant((String) null));
                z = true;
            } else {
                String property = this._brokerConf.getProperty("pinot.broker.instance.tags");
                if (StringUtils.isNotEmpty(property)) {
                    for (String str : StringUtils.split(property, ',')) {
                        Preconditions.checkArgument(TagNameUtils.isBrokerTag(str), "Illegal broker instance tag: %s", str);
                        instanceConfig.addTag(str);
                    }
                    z = true;
                } else {
                    instanceConfig.addTag("broker_untagged");
                }
            }
            tags = instanceConfig.getTags();
            updatePortIfNeeded = true;
        }
        if (updatePortIfNeeded) {
            HelixHelper.updateInstanceConfig(this._participantHelixManager, instanceConfig);
        }
        if (z) {
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList();
            HelixHelper.updateBrokerResource(this._participantHelixManager, this._instanceId, tags, arrayList, (List) null);
            LOGGER.info("Updated broker resource for new joining broker: {} with instance tags: {} in {}ms, tables added: {}", new Object[]{this._instanceId, tags, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), arrayList});
        }
    }

    private void registerServiceStatusHandler() {
        ArrayList arrayList = new ArrayList(1);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._clusterName, "brokerResource");
        if (resourceIdealState != null && resourceIdealState.isEnabled()) {
            Iterator it = resourceIdealState.getPartitionSet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (resourceIdealState.getInstanceSet((String) it.next()).contains(this._instanceId)) {
                    arrayList.add("brokerResource");
                    break;
                }
            }
        }
        double property = this._brokerConf.getProperty("pinot.broker.startup.minResourcePercent", 100.0d);
        LOGGER.info("Registering service status handler");
        ServiceStatus.setServiceStatusCallback(this._instanceId, new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._instanceId, arrayList, property), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._instanceId, arrayList, property), new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
    }

    private String getDefaultBrokerId() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            LOGGER.error("Caught exception while getting default broker Id", e);
            return "";
        }
    }

    private boolean updatePortIfNeeded(Map<String, String> map, String str, int i) {
        String str2 = map.get(str);
        if (i <= 0) {
            if (str2 == null) {
                return false;
            }
            LOGGER.info("Removing '{}' from instance: {}", str, this._instanceId);
            map.remove(str);
            return true;
        }
        String num = Integer.toString(i);
        if (num.equals(str2)) {
            return false;
        }
        LOGGER.info("Updating '{}' for instance: {} to: {}", new Object[]{str, this._instanceId, Integer.valueOf(i)});
        map.put(str, num);
        return true;
    }

    public void stop() {
        LOGGER.info("Shutting down Pinot broker");
        this._isShuttingDown = true;
        LOGGER.info("Disconnecting participant Helix manager");
        this._participantHelixManager.disconnect();
        LOGGER.info("Stopping cluster change mediator");
        this._clusterChangeMediator.stop();
        long property = this._brokerConf.getProperty("pinot.broker.delayShutdownTimeMs", 10000L);
        LOGGER.info("Wait for {}ms before shutting down request handler to finish the pending queries", Long.valueOf(property));
        try {
            Thread.sleep(property);
        } catch (Exception e) {
            LOGGER.error("Caught exception while waiting for shutdown delay of {}ms", Long.valueOf(property), e);
        }
        LOGGER.info("Shutting down request handler and broker admin application");
        this._brokerRequestHandler.shutDown();
        this._brokerAdminApplication.stop();
        LOGGER.info("Disconnecting spectator Helix manager");
        this._spectatorHelixManager.disconnect();
        LOGGER.info("Deregistering service status handler");
        ServiceStatus.removeServiceStatusCallback(this._instanceId);
        LOGGER.info("Shutdown Broker Metrics Registry");
        this._metricsRegistry.shutdown();
        LOGGER.info("Finish shutting down Pinot broker for {}", this._instanceId);
    }

    public boolean isStarting() {
        return this._isStarting;
    }

    public boolean isShuttingDown() {
        return this._isShuttingDown;
    }

    public HelixManager getSpectatorHelixManager() {
        return this._spectatorHelixManager;
    }

    public PinotMetricsRegistry getMetricsRegistry() {
        return this._metricsRegistry;
    }

    public BrokerMetrics getBrokerMetrics() {
        return this._brokerMetrics;
    }

    public BrokerRoutingManager getRoutingManager() {
        return this._routingManager;
    }

    public AccessControlFactory getAccessControlFactory() {
        return this._accessControlFactory;
    }

    public BrokerRequestHandler getBrokerRequestHandler() {
        return this._brokerRequestHandler;
    }
}
