package org.apache.pinot.query.routing;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.hint.PinotHintOptions;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/query/routing/WorkerManager.class */
public class WorkerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerManager.class);
    private static final Random RANDOM = new Random();
    private final String _hostName;
    private final int _port;
    private final RoutingManager _routingManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/query/routing/WorkerManager$ColocatedPartitionInfo.class */
    public static class ColocatedPartitionInfo {
        final Set<String> _fullyReplicatedServers;
        final List<String> _offlineSegments;
        final List<String> _realtimeSegments;

        public ColocatedPartitionInfo(Set<String> set, @Nullable List<String> list, @Nullable List<String> list2) {
            this._fullyReplicatedServers = set;
            this._offlineSegments = list;
            this._realtimeSegments = list2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pinot/query/routing/WorkerManager$ColocatedTableInfo.class */
    public static class ColocatedTableInfo {
        final ColocatedPartitionInfo[] _partitionInfoMap;
        final TimeBoundaryInfo _timeBoundaryInfo;

        ColocatedTableInfo(ColocatedPartitionInfo[] colocatedPartitionInfoArr, @Nullable TimeBoundaryInfo timeBoundaryInfo) {
            this._partitionInfoMap = colocatedPartitionInfoArr;
            this._timeBoundaryInfo = timeBoundaryInfo;
        }
    }

    public WorkerManager(String str, int i, RoutingManager routingManager) {
        this._hostName = str;
        this._port = i;
        this._routingManager = routingManager;
    }

    public void assignWorkers(PlanFragment planFragment, DispatchablePlanContext dispatchablePlanContext) {
        dispatchablePlanContext.getDispatchablePlanMetadataMap().get(0).setWorkerIdToServerInstanceMap(Collections.singletonMap(0, new QueryServerInstance(this._hostName, this._port, this._port)));
        Iterator<PlanFragment> it = planFragment.getChildren().iterator();
        while (it.hasNext()) {
            assignWorkersToNonRootFragment(it.next(), dispatchablePlanContext);
        }
    }

    private void assignWorkersToNonRootFragment(PlanFragment planFragment, DispatchablePlanContext dispatchablePlanContext) {
        if (isLeafPlan(dispatchablePlanContext.getDispatchablePlanMetadataMap().get(Integer.valueOf(planFragment.getFragmentId())))) {
            assignWorkersToLeafFragment(planFragment, dispatchablePlanContext);
        } else {
            assignWorkersToIntermediateFragment(planFragment, dispatchablePlanContext);
        }
    }

    private static boolean isLeafPlan(DispatchablePlanMetadata dispatchablePlanMetadata) {
        return dispatchablePlanMetadata.getScannedTables().size() == 1;
    }

    private void assignWorkersToLeafFragment(PlanFragment planFragment, DispatchablePlanContext dispatchablePlanContext) {
        Iterator<PlanFragment> it = planFragment.getChildren().iterator();
        while (it.hasNext()) {
            assignWorkersToNonRootFragment(it.next(), dispatchablePlanContext);
        }
        DispatchablePlanMetadata dispatchablePlanMetadata = dispatchablePlanContext.getDispatchablePlanMetadataMap().get(Integer.valueOf(planFragment.getFragmentId()));
        Map<String, String> tableOptions = dispatchablePlanMetadata.getTableOptions();
        String str = tableOptions != null ? tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY) : null;
        if (str == null) {
            assignWorkersToNonPartitionedLeafFragment(dispatchablePlanMetadata, dispatchablePlanContext);
            return;
        }
        String str2 = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
        Preconditions.checkState(str2 != null, "'%s' must be provided for partition key: %s", PinotHintOptions.TableHintOptions.PARTITION_SIZE, str);
        int parseInt = Integer.parseInt(str2);
        Preconditions.checkState(parseInt > 0, "'%s' must be positive, got: %s", PinotHintOptions.TableHintOptions.PARTITION_SIZE, parseInt);
        String str3 = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM);
        int parseInt2 = str3 != null ? Integer.parseInt(str3) : 1;
        Preconditions.checkState(parseInt2 > 0, "'%s' must be positive: %s, got: %s", PinotHintOptions.TableHintOptions.PARTITION_PARALLELISM, parseInt2);
        assignWorkersToPartitionedLeafFragment(dispatchablePlanMetadata, dispatchablePlanContext, str, parseInt, parseInt2);
    }

    private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata dispatchablePlanMetadata, DispatchablePlanContext dispatchablePlanContext) {
        String str = dispatchablePlanMetadata.getScannedTables().get(0);
        Map<String, RoutingTable> routingTable = getRoutingTable(str, dispatchablePlanContext.getRequestId());
        Preconditions.checkState(!routingTable.isEmpty(), "Unable to find routing entries for table: %s", str);
        if (routingTable.size() > 1) {
            TimeBoundaryInfo timeBoundaryInfo = this._routingManager.getTimeBoundaryInfo(TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(str)));
            if (timeBoundaryInfo != null) {
                dispatchablePlanMetadata.setTimeBoundaryInfo(timeBoundaryInfo);
            } else {
                routingTable.remove(TableType.OFFLINE.name());
            }
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, RoutingTable> entry : routingTable.entrySet()) {
            String key = entry.getKey();
            RoutingTable value = entry.getValue();
            for (Map.Entry entry2 : value.getServerInstanceToSegmentsMap().entrySet()) {
                hashMap.putIfAbsent((ServerInstance) entry2.getKey(), new HashMap());
                Preconditions.checkState(((Map) hashMap.get(entry2.getKey())).put(key, (List) entry2.getValue()) == null, "Entry for server {} and table type: {} already exist!", entry2.getKey(), key);
            }
            if (!value.getUnavailableSegments().isEmpty()) {
                dispatchablePlanMetadata.addUnavailableSegments(str, value.getUnavailableSegments());
            }
        }
        int i = 0;
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (Map.Entry entry3 : hashMap.entrySet()) {
            hashMap2.put(Integer.valueOf(i), new QueryServerInstance((ServerInstance) entry3.getKey()));
            hashMap3.put(Integer.valueOf(i), (Map) entry3.getValue());
            i++;
        }
        dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(hashMap2);
        dispatchablePlanMetadata.setWorkerIdToSegmentsMap(hashMap3);
    }

    private Map<String, RoutingTable> getRoutingTable(String str, long j) {
        String extractRawTableName = TableNameBuilder.extractRawTableName(str);
        TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(str);
        HashMap hashMap = new HashMap();
        if (tableTypeFromTableName == null) {
            RoutingTable routingTable = getRoutingTable(extractRawTableName, TableType.OFFLINE, j);
            if (routingTable != null) {
                hashMap.put(TableType.OFFLINE.name(), routingTable);
            }
            RoutingTable routingTable2 = getRoutingTable(extractRawTableName, TableType.REALTIME, j);
            if (routingTable2 != null) {
                hashMap.put(TableType.REALTIME.name(), routingTable2);
            }
        } else {
            RoutingTable routingTable3 = getRoutingTable(str, tableTypeFromTableName, j);
            if (routingTable3 != null) {
                hashMap.put(tableTypeFromTableName.name(), routingTable3);
            }
        }
        return hashMap;
    }

    private RoutingTable getRoutingTable(String str, TableType tableType, long j) {
        return this._routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + TableNameBuilder.forType(tableType).tableNameWithType(TableNameBuilder.extractRawTableName(str))), j);
    }

    private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata dispatchablePlanMetadata, DispatchablePlanContext dispatchablePlanContext, String str, int i, int i2) {
        String str2 = dispatchablePlanMetadata.getScannedTables().get(0);
        ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(str2, str, i);
        long requestId = dispatchablePlanContext.getRequestId();
        ColocatedPartitionInfo[] colocatedPartitionInfoArr = colocatedTableInfo._partitionInfoMap;
        int i3 = 0;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Map enabledServerInstanceMap = this._routingManager.getEnabledServerInstanceMap();
        for (int i4 = 0; i4 < i; i4++) {
            ColocatedPartitionInfo colocatedPartitionInfo = colocatedPartitionInfoArr[i4];
            Preconditions.checkState(colocatedPartitionInfo != null, "Failed to find any segment for table: %s, partition: %s", str2, i4);
            long j = requestId;
            requestId = j + 1;
            ServerInstance pickEnabledServer = pickEnabledServer(colocatedPartitionInfo._fullyReplicatedServers, enabledServerInstanceMap, j);
            Preconditions.checkState(pickEnabledServer != null, "Failed to find enabled fully replicated server for table: %s, partition: %s", str2, i4);
            hashMap.put(Integer.valueOf(i3), new QueryServerInstance(pickEnabledServer));
            hashMap2.put(Integer.valueOf(i3), getSegmentsMap(colocatedPartitionInfo));
            i3++;
        }
        dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(hashMap);
        dispatchablePlanMetadata.setWorkerIdToSegmentsMap(hashMap2);
        dispatchablePlanMetadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
        dispatchablePlanMetadata.setPartitionedTableScan(true);
        dispatchablePlanMetadata.setPartitionParallelism(i2);
    }

    private void assignWorkersToIntermediateFragment(PlanFragment planFragment, DispatchablePlanContext dispatchablePlanContext) {
        ArrayList arrayList;
        List<PlanFragment> children = planFragment.getChildren();
        Iterator<PlanFragment> it = children.iterator();
        while (it.hasNext()) {
            assignWorkersToNonRootFragment(it.next(), dispatchablePlanContext);
        }
        Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap = dispatchablePlanContext.getDispatchablePlanMetadataMap();
        DispatchablePlanMetadata dispatchablePlanMetadata = dispatchablePlanMetadataMap.get(Integer.valueOf(planFragment.getFragmentId()));
        if (!children.isEmpty()) {
            DispatchablePlanMetadata dispatchablePlanMetadata2 = dispatchablePlanMetadataMap.get(Integer.valueOf(children.get(0).getFragmentId()));
            if (dispatchablePlanMetadata2.isPartitionedTableScan()) {
                int partitionParallelism = dispatchablePlanMetadata2.getPartitionParallelism();
                Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = dispatchablePlanMetadata2.getWorkerIdToServerInstanceMap();
                if (partitionParallelism == 1) {
                    dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
                    return;
                }
                int size = workerIdToServerInstanceMap.size();
                HashMap hashMap = new HashMap();
                int i = 0;
                for (int i2 = 0; i2 < size; i2++) {
                    QueryServerInstance queryServerInstance = workerIdToServerInstanceMap.get(Integer.valueOf(i2));
                    for (int i3 = 0; i3 < partitionParallelism; i3++) {
                        int i4 = i;
                        i++;
                        hashMap.put(Integer.valueOf(i4), queryServerInstance);
                    }
                }
                dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(hashMap);
                return;
            }
        }
        Set<String> tableNames = dispatchablePlanContext.getTableNames();
        Map enabledServerInstanceMap = this._routingManager.getEnabledServerInstanceMap();
        if (tableNames.size() == 0) {
            arrayList = new ArrayList(enabledServerInstanceMap.values());
        } else {
            HashSet hashSet = new HashSet();
            for (String str : tableNames) {
                if (TableNameBuilder.getTableTypeFromTableName(str) == null) {
                    Set servingInstances = this._routingManager.getServingInstances(TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(str));
                    if (servingInstances != null) {
                        hashSet.addAll(servingInstances);
                    }
                    Set servingInstances2 = this._routingManager.getServingInstances(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str));
                    if (servingInstances2 != null) {
                        hashSet.addAll(servingInstances2);
                    }
                } else {
                    Set servingInstances3 = this._routingManager.getServingInstances(str);
                    if (servingInstances3 != null) {
                        hashSet.addAll(servingInstances3);
                    }
                }
            }
            if (hashSet.isEmpty()) {
                arrayList = new ArrayList(enabledServerInstanceMap.values());
            } else {
                arrayList = new ArrayList(hashSet.size());
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    ServerInstance serverInstance = (ServerInstance) enabledServerInstanceMap.get((String) it2.next());
                    if (serverInstance != null) {
                        arrayList.add(serverInstance);
                    }
                }
            }
        }
        if (arrayList.isEmpty()) {
            LOGGER.error("[RequestId: {}] No server instance found for intermediate stage for tables: {}", Long.valueOf(dispatchablePlanContext.getRequestId()), tableNames);
            throw new IllegalStateException("No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
        }
        if (dispatchablePlanMetadata.isRequiresSingletonInstance()) {
            dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0, new QueryServerInstance((ServerInstance) arrayList.get(RANDOM.nextInt(arrayList.size())))));
            return;
        }
        int parseInt = Integer.parseInt(dispatchablePlanContext.getPlannerContext().getOptions().getOrDefault("stageParallelism", "1"));
        HashMap hashMap2 = new HashMap();
        int i5 = 0;
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            QueryServerInstance queryServerInstance2 = new QueryServerInstance((ServerInstance) it3.next());
            for (int i6 = 0; i6 < parseInt; i6++) {
                int i7 = i5;
                i5++;
                hashMap2.put(Integer.valueOf(i7), queryServerInstance2);
            }
        }
        dispatchablePlanMetadata.setWorkerIdToServerInstanceMap(hashMap2);
    }

    private ColocatedTableInfo getColocatedTableInfo(String str, String str2, int i) {
        TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(str);
        if (tableTypeFromTableName != null) {
            return tableTypeFromTableName == TableType.OFFLINE ? getOfflineColocatedTableInfo(str, str2, i) : getRealtimeColocatedTableInfo(str, str2, i);
        }
        String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(str);
        String tableNameWithType2 = TableNameBuilder.REALTIME.tableNameWithType(str);
        boolean routingExists = this._routingManager.routingExists(tableNameWithType);
        boolean routingExists2 = this._routingManager.routingExists(tableNameWithType2);
        Preconditions.checkState(routingExists || routingExists2, "Routing doesn't exist for table: %s", str);
        if (!routingExists || !routingExists2) {
            return routingExists ? getOfflineColocatedTableInfo(tableNameWithType, str2, i) : getRealtimeColocatedTableInfo(tableNameWithType2, str2, i);
        }
        TimeBoundaryInfo timeBoundaryInfo = this._routingManager.getTimeBoundaryInfo(tableNameWithType);
        if (timeBoundaryInfo == null) {
            return getRealtimeColocatedTableInfo(tableNameWithType2, str2, i);
        }
        TablePartitionInfo.PartitionInfo[] partitionInfoMap = getTablePartitionInfo(tableNameWithType, str2, i).getPartitionInfoMap();
        TablePartitionInfo.PartitionInfo[] partitionInfoMap2 = getTablePartitionInfo(tableNameWithType2, str2, i).getPartitionInfoMap();
        ColocatedPartitionInfo[] colocatedPartitionInfoArr = new ColocatedPartitionInfo[i];
        for (int i2 = 0; i2 < i; i2++) {
            TablePartitionInfo.PartitionInfo partitionInfo = partitionInfoMap[i2];
            TablePartitionInfo.PartitionInfo partitionInfo2 = partitionInfoMap2[i2];
            if (partitionInfo != null || partitionInfo2 != null) {
                if (partitionInfo == null) {
                    colocatedPartitionInfoArr[i2] = new ColocatedPartitionInfo(partitionInfo2._fullyReplicatedServers, null, partitionInfo2._segments);
                } else if (partitionInfo2 == null) {
                    colocatedPartitionInfoArr[i2] = new ColocatedPartitionInfo(partitionInfo._fullyReplicatedServers, partitionInfo._segments, null);
                } else {
                    HashSet hashSet = new HashSet(partitionInfo._fullyReplicatedServers);
                    hashSet.retainAll(partitionInfo2._fullyReplicatedServers);
                    Preconditions.checkState(!hashSet.isEmpty(), "Failed to find fully replicated server for partition: %s in hybrid table: %s", i2, str);
                    colocatedPartitionInfoArr[i2] = new ColocatedPartitionInfo(hashSet, partitionInfo._segments, partitionInfo2._segments);
                }
            }
        }
        return new ColocatedTableInfo(colocatedPartitionInfoArr, timeBoundaryInfo);
    }

    private TablePartitionInfo getTablePartitionInfo(String str, String str2, int i) {
        TablePartitionInfo tablePartitionInfo = this._routingManager.getTablePartitionInfo(str);
        Preconditions.checkState(tablePartitionInfo != null, "Failed to find table partition info for table: %s", str);
        Preconditions.checkState(tablePartitionInfo.getPartitionColumn().equals(str2), "Partition key: %s does not match partition column: %s for table: %s", str2, tablePartitionInfo.getPartitionColumn(), str);
        Preconditions.checkState(tablePartitionInfo.getNumPartitions() == i, "Partition size mismatch (hint: %s, table: %s) for table: %s", Integer.valueOf(i), Integer.valueOf(tablePartitionInfo.getNumPartitions()), str);
        Preconditions.checkState(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty(), "Find %s segments with invalid partition for table: %s", tablePartitionInfo.getSegmentsWithInvalidPartition().size(), str);
        return tablePartitionInfo;
    }

    private ColocatedTableInfo getOfflineColocatedTableInfo(String str, String str2, int i) {
        TablePartitionInfo.PartitionInfo[] partitionInfoMap = getTablePartitionInfo(str, str2, i).getPartitionInfoMap();
        ColocatedPartitionInfo[] colocatedPartitionInfoArr = new ColocatedPartitionInfo[i];
        for (int i2 = 0; i2 < i; i2++) {
            TablePartitionInfo.PartitionInfo partitionInfo = partitionInfoMap[i2];
            if (partitionInfo != null) {
                colocatedPartitionInfoArr[i2] = new ColocatedPartitionInfo(partitionInfo._fullyReplicatedServers, partitionInfo._segments, null);
            }
        }
        return new ColocatedTableInfo(colocatedPartitionInfoArr, null);
    }

    private ColocatedTableInfo getRealtimeColocatedTableInfo(String str, String str2, int i) {
        TablePartitionInfo.PartitionInfo[] partitionInfoMap = getTablePartitionInfo(str, str2, i).getPartitionInfoMap();
        ColocatedPartitionInfo[] colocatedPartitionInfoArr = new ColocatedPartitionInfo[i];
        for (int i2 = 0; i2 < i; i2++) {
            TablePartitionInfo.PartitionInfo partitionInfo = partitionInfoMap[i2];
            if (partitionInfo != null) {
                colocatedPartitionInfoArr[i2] = new ColocatedPartitionInfo(partitionInfo._fullyReplicatedServers, null, partitionInfo._segments);
            }
        }
        return new ColocatedTableInfo(colocatedPartitionInfoArr, null);
    }

    @Nullable
    private static ServerInstance pickEnabledServer(Set<String> set, Map<String, ServerInstance> map, long j) {
        int size = set.size();
        if (size == 0) {
            return null;
        }
        if (size == 1) {
            return map.get(set.iterator().next());
        }
        ArrayList arrayList = new ArrayList(set);
        arrayList.sort(null);
        int i = (int) ((j & Long.MAX_VALUE) % size);
        ArrayUtils.shuffle((String[]) set.toArray(new String[0]), RANDOM);
        for (int i2 = 0; i2 < size; i2++) {
            ServerInstance serverInstance = map.get((String) arrayList.get((i + i2) % size));
            if (serverInstance != null) {
                return serverInstance;
            }
        }
        return null;
    }

    private static Map<String, List<String>> getSegmentsMap(ColocatedPartitionInfo colocatedPartitionInfo) {
        HashMap hashMap = new HashMap();
        if (colocatedPartitionInfo._offlineSegments != null) {
            hashMap.put(TableType.OFFLINE.name(), colocatedPartitionInfo._offlineSegments);
        }
        if (colocatedPartitionInfo._realtimeSegments != null) {
            hashMap.put(TableType.REALTIME.name(), colocatedPartitionInfo._realtimeSegments);
        }
        return hashMap;
    }
}
