package org.apache.pinot.broker.broker.helix;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Message;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/broker/helix/BaseBrokerStarter.class */
public abstract class BaseBrokerStarter implements ServiceStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseBrokerStarter.class);
    protected PinotConfiguration _brokerConf;
    protected List<ListenerConfig> _listenerConfigs;
    protected String _clusterName;
    protected String _zkServers;
    protected String _hostname;
    protected int _port;
    protected int _tlsPort;
    protected String _instanceId;
    private volatile boolean _isStarting = false;
    private volatile boolean _isShuttingDown = false;
    protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new ArrayList();
    protected final List<ClusterChangeHandler> _liveInstanceChangeHandlers = new ArrayList();
    protected HelixManager _spectatorHelixManager;
    protected HelixAdmin _helixAdmin;
    protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
    protected HelixDataAccessor _helixDataAccessor;
    protected PinotMetricsRegistry _metricsRegistry;
    protected BrokerMetrics _brokerMetrics;
    protected BrokerRoutingManager _routingManager;
    protected AccessControlFactory _accessControlFactory;
    protected BrokerRequestHandler _brokerRequestHandler;
    protected SqlQueryExecutor _sqlQueryExecutor;
    protected BrokerAdminApiApplication _brokerAdminApplication;
    protected ClusterChangeMediator _clusterChangeMediator;
    protected HelixManager _participantHelixManager;
    protected ServerRoutingStatsManager _serverRoutingStatsManager;

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public void init(PinotConfiguration pinotConfiguration) throws Exception {
        this._brokerConf = pinotConfiguration;
        this._zkServers = pinotConfiguration.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", "");
        this._clusterName = pinotConfiguration.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
        ServiceStartableUtils.applyClusterConfig(this._brokerConf, this._zkServers, this._clusterName, ServiceRole.BROKER);
        setupHelixSystemProperties();
        this._listenerConfigs = ListenerConfigUtil.buildBrokerConfigs(pinotConfiguration);
        this._hostname = pinotConfiguration.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_HOSTNAME);
        if (this._hostname == null) {
            this._hostname = this._brokerConf.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils.getHostnameOrAddress() : NetUtils.getHostAddress();
        }
        this._port = this._listenerConfigs.get(0).getPort();
        this._tlsPort = ListenerConfigUtil.findLastTlsPort(this._listenerConfigs, -1);
        this._instanceId = this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
        if (this._instanceId == null) {
            this._instanceId = this._brokerConf.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
        }
        if (this._instanceId == null) {
            this._instanceId = "Broker_" + this._hostname + "_" + this._port;
        }
        Preconditions.checkState(InstanceTypeUtils.isBroker(this._instanceId), "Instance id must have prefix '%s', got '%s'", CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE, this._instanceId);
        this._brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, this._instanceId);
    }

    private void setupHelixSystemProperties() {
        System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, this._brokerConf.getProperty(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS, "1"));
    }

    public int getPort() {
        return this._port;
    }

    public void addIdealStateChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._idealStateChangeHandlers.add(clusterChangeHandler);
    }

    public void addExternalViewChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._externalViewChangeHandlers.add(clusterChangeHandler);
    }

    public void addInstanceConfigChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._instanceConfigChangeHandlers.add(clusterChangeHandler);
    }

    public void addLiveInstanceChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._liveInstanceChangeHandlers.add(clusterChangeHandler);
    }

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public ServiceRole getServiceRole() {
        return ServiceRole.BROKER;
    }

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public String getInstanceId() {
        return this._instanceId;
    }

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public PinotConfiguration getConfig() {
        return this._brokerConf;
    }

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public void start() throws Exception {
        LOGGER.info("Starting Pinot broker (Version: {})", PinotVersion.VERSION);
        this._isStarting = true;
        Utils.logVersions();
        LOGGER.info("Connecting spectator Helix manager");
        this._spectatorHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._instanceId, InstanceType.SPECTATOR, this._zkServers);
        this._spectatorHelixManager.connect();
        this._helixAdmin = this._spectatorHelixManager.getClusterManagmentTool();
        this._propertyStore = this._spectatorHelixManager.getHelixPropertyStore();
        this._helixDataAccessor = this._spectatorHelixManager.getHelixDataAccessor();
        LOGGER.info("Setting up broker request handler");
        this._metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(this._brokerConf.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
        this._brokerMetrics = new BrokerMetrics(this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX, CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX), this._metricsRegistry, this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, true), this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS, Collections.emptyList()));
        this._brokerMetrics.initializeGlobalMeters();
        this._brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1L);
        this._serverRoutingStatsManager = new ServerRoutingStatsManager(this._brokerConf);
        this._serverRoutingStatsManager.init();
        this._routingManager = new BrokerRoutingManager(this._brokerMetrics, this._serverRoutingStatsManager, this._brokerConf);
        this._routingManager.init(this._spectatorHelixManager);
        this._accessControlFactory = AccessControlFactory.loadFactory(this._brokerConf.subset(CommonConstants.Broker.ACCESS_CONTROL_CONFIG_PREFIX), this._propertyStore);
        HelixExternalViewBasedQueryQuotaManager helixExternalViewBasedQueryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(this._brokerMetrics, this._instanceId);
        helixExternalViewBasedQueryQuotaManager.init(this._spectatorHelixManager);
        LOGGER.info("Initializing QueryRewriterFactory");
        QueryRewriterFactory.init(this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES));
        FunctionRegistry.init();
        TableCache tableCache = new TableCache(this._propertyStore, this._brokerConf.getProperty(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, false) || this._brokerConf.getProperty(CommonConstants.Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY, false));
        TlsConfig extractTlsConfig = TlsUtils.extractTlsConfig(this._brokerConf, CommonConstants.Broker.BROKER_TLS_PREFIX);
        NettyConfig extractNettyConfig = NettyConfig.extractNettyConfig(this._brokerConf, CommonConstants.Broker.BROKER_NETTY_PREFIX);
        String property = this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
        BaseBrokerRequestHandler grpcBrokerRequestHandler = this._brokerConf.getProperty(CommonConstants.Broker.BROKER_REQUEST_HANDLER_TYPE, "netty").equalsIgnoreCase(CommonConstants.Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE) ? new GrpcBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, null) : this._brokerConf.getProperty(CommonConstants.Broker.BROKER_NETTYTLS_ENABLED, false) ? new SingleConnectionBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, extractNettyConfig, extractTlsConfig, this._serverRoutingStatsManager) : new SingleConnectionBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics, extractNettyConfig, null, this._serverRoutingStatsManager);
        MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
        if (this._brokerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, false)) {
            multiStageBrokerRequestHandler = new MultiStageBrokerRequestHandler(this._brokerConf, property, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, tableCache, this._brokerMetrics);
        }
        this._brokerRequestHandler = new BrokerRequestHandlerDelegate(property, grpcBrokerRequestHandler, multiStageBrokerRequestHandler, this._brokerMetrics);
        this._brokerRequestHandler.start();
        String property2 = this._brokerConf.getProperty(CommonConstants.Broker.CONTROLLER_URL);
        if (property2 != null) {
            this._sqlQueryExecutor = new SqlQueryExecutor(property2);
        } else {
            this._sqlQueryExecutor = new SqlQueryExecutor(this._spectatorHelixManager);
        }
        LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(this._listenerConfigs));
        this._brokerAdminApplication = new BrokerAdminApiApplication(this._routingManager, this._brokerRequestHandler, this._brokerMetrics, this._brokerConf, this._sqlQueryExecutor, this._serverRoutingStatsManager, this._accessControlFactory);
        this._brokerAdminApplication.start(this._listenerConfigs);
        LOGGER.info("Initializing cluster change mediator");
        Iterator<ClusterChangeHandler> it2 = this._idealStateChangeHandlers.iterator();
        while (it2.hasNext()) {
            it2.next().init(this._spectatorHelixManager);
        }
        this._idealStateChangeHandlers.add(this._routingManager);
        Iterator<ClusterChangeHandler> it3 = this._externalViewChangeHandlers.iterator();
        while (it3.hasNext()) {
            it3.next().init(this._spectatorHelixManager);
        }
        this._externalViewChangeHandlers.add(this._routingManager);
        this._externalViewChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it4 = this._instanceConfigChangeHandlers.iterator();
        while (it4.hasNext()) {
            it4.next().init(this._spectatorHelixManager);
        }
        this._instanceConfigChangeHandlers.add(this._routingManager);
        this._instanceConfigChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it5 = this._liveInstanceChangeHandlers.iterator();
        while (it5.hasNext()) {
            it5.next().init(this._spectatorHelixManager);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(HelixConstants.ChangeType.IDEAL_STATE, this._idealStateChangeHandlers);
        hashMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, this._externalViewChangeHandlers);
        hashMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, this._instanceConfigChangeHandlers);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            hashMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, this._liveInstanceChangeHandlers);
        }
        this._clusterChangeMediator = new ClusterChangeMediator(hashMap, this._brokerMetrics);
        this._clusterChangeMediator.start();
        this._spectatorHelixManager.addIdealStateChangeListener(this._clusterChangeMediator);
        this._spectatorHelixManager.addExternalViewChangeListener(this._clusterChangeMediator);
        this._spectatorHelixManager.addInstanceConfigChangeListener(this._clusterChangeMediator);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            this._spectatorHelixManager.addLiveInstanceChangeListener(this._clusterChangeMediator);
        }
        LOGGER.info("Connecting participant Helix manager");
        this._participantHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._instanceId, InstanceType.PARTICIPANT, this._zkServers);
        this._participantHelixManager.getStateMachineEngine().registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), new BrokerResourceOnlineOfflineStateModelFactory(this._propertyStore, this._helixDataAccessor, this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new BrokerUserDefinedMessageHandlerFactory(this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.connect();
        updateInstanceConfigAndBrokerResourceIfNeeded();
        this._brokerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> {
            return Long.valueOf(this._participantHelixManager.isConnected() ? 1L : 0L);
        });
        this._participantHelixManager.addPreConnectCallback(() -> {
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L);
        });
        registerServiceStatusHandler();
        this._isStarting = false;
        LOGGER.info("Finish starting Pinot broker");
    }

    private void updateInstanceConfigAndBrokerResourceIfNeeded() {
        InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(this._participantHelixManager, this._instanceId);
        boolean updateHostnamePort = HelixHelper.updateHostnamePort(instanceConfig, this._hostname, this._port);
        if (this._tlsPort > 0) {
            HelixHelper.updateTlsPort(instanceConfig, this._tlsPort);
        }
        boolean removeDisabledPartitions = updateHostnamePort | HelixHelper.removeDisabledPartitions(instanceConfig);
        boolean z = false;
        String str = null;
        if (instanceConfig.getTags().isEmpty()) {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled(this._propertyStore)) {
                str = TagNameUtils.getBrokerTagForTenant(null);
                z = true;
            } else {
                str = CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
            }
            instanceConfig.addTag(str);
            removeDisabledPartitions = true;
        }
        if (removeDisabledPartitions) {
            HelixHelper.updateInstanceConfig(this._participantHelixManager, instanceConfig);
        }
        if (z) {
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList();
            HelixHelper.updateBrokerResource(this._participantHelixManager, this._instanceId, Collections.singletonList(str), arrayList, null);
            LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", this._instanceId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), arrayList);
        }
    }

    private void registerServiceStatusHandler() {
        ArrayList arrayList = new ArrayList(1);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._clusterName, "brokerResource");
        if (resourceIdealState != null && resourceIdealState.isEnabled()) {
            Iterator<String> it2 = resourceIdealState.getPartitionSet().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                } else if (resourceIdealState.getInstanceSet(it2.next()).contains(this._instanceId)) {
                    arrayList.add("brokerResource");
                    break;
                }
            }
        }
        double property = this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START, 100.0d);
        LOGGER.info("Registering service status handler");
        ServiceStatus.setServiceStatusCallback(this._instanceId, new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of((ServiceStatus.LifecycleServiceStatusCallback) new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._instanceId, arrayList, property), (ServiceStatus.LifecycleServiceStatusCallback) new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._instanceId, arrayList, property), new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
    }

    private String getDefaultBrokerId() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            LOGGER.error("Caught exception while getting default broker Id", (Throwable) e);
            return "";
        }
    }

    @Override // org.apache.pinot.spi.services.ServiceStartable
    public void stop() {
        LOGGER.info("Shutting down Pinot broker");
        this._isShuttingDown = true;
        LOGGER.info("Disconnecting participant Helix manager");
        this._participantHelixManager.disconnect();
        LOGGER.info("Stopping cluster change mediator");
        this._clusterChangeMediator.stop();
        long property = this._brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 10000L);
        LOGGER.info("Wait for {}ms before shutting down request handler to finish the pending queries", Long.valueOf(property));
        try {
            Thread.sleep(property);
        } catch (Exception e) {
            LOGGER.error("Caught exception while waiting for shutdown delay of {}ms", Long.valueOf(property), e);
        }
        LOGGER.info("Shutting down request handler and broker admin application");
        this._brokerRequestHandler.shutDown();
        this._brokerAdminApplication.stop();
        LOGGER.info("Disconnecting spectator Helix manager");
        this._spectatorHelixManager.disconnect();
        LOGGER.info("Deregistering service status handler");
        ServiceStatus.removeServiceStatusCallback(this._instanceId);
        LOGGER.info("Shutdown Broker Metrics Registry");
        this._metricsRegistry.shutdown();
        LOGGER.info("Finish shutting down Pinot broker for {}", this._instanceId);
    }

    public boolean isStarting() {
        return this._isStarting;
    }

    public boolean isShuttingDown() {
        return this._isShuttingDown;
    }

    public HelixManager getSpectatorHelixManager() {
        return this._spectatorHelixManager;
    }

    public PinotMetricsRegistry getMetricsRegistry() {
        return this._metricsRegistry;
    }

    public BrokerMetrics getBrokerMetrics() {
        return this._brokerMetrics;
    }

    public BrokerRoutingManager getRoutingManager() {
        return this._routingManager;
    }

    public AccessControlFactory getAccessControlFactory() {
        return this._accessControlFactory;
    }

    public BrokerRequestHandler getBrokerRequestHandler() {
        return this._brokerRequestHandler;
    }
}
