package org.apache.pinot.plugin.minion.tasks.mergerollup;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.minion.MergeRollupTaskMetadata;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.mergerollup.segmentgroupmananger.MergeRollupTaskSegmentGroupManagerProvider;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.class */
public class MergeRollupTaskGenerator extends BaseTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
    private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50000000;
    private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
    private static final String REFRESH = "REFRESH";
    private static final String DELIMITER_IN_SEGMENT_NAME = "_";
    private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets";
    private static final String MERGE_ROLLUP_TASK_NUM_BUCKETS_TO_PROCESS = "mergeRollupTaskNumBucketsToProcess";
    private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new HashMap();
    private final Map<String, Long> _tableLowestLevelMaxValidBucketEndTimeMs = new HashMap();
    private final Map<String, Map<String, Long>> _tableNumberBucketsToProcess = new HashMap();

    public String getTaskType() {
        return "MergeRollupTask";
    }

    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            if (validate(tableConfig, "MergeRollupTask")) {
                String tableName = tableConfig.getTableName();
                LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, "MergeRollupTask");
                List<SegmentZKMetadata> filterSegmentsBasedOnStatus = filterSegmentsBasedOnStatus(tableConfig.getTableType(), getSegmentsZKMetadataForTable(tableName));
                SegmentLineage segmentLineage = this._clusterInfoAccessor.getSegmentLineage(tableName);
                HashSet hashSet = new HashSet();
                Iterator<SegmentZKMetadata> it = filterSegmentsBasedOnStatus.iterator();
                while (it.hasNext()) {
                    hashSet.add(it.next().getSegmentName());
                }
                SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(hashSet, segmentLineage);
                ArrayList arrayList2 = new ArrayList();
                for (SegmentZKMetadata segmentZKMetadata : filterSegmentsBasedOnStatus) {
                    if (hashSet.contains(segmentZKMetadata.getSegmentName()) && segmentZKMetadata.getTotalDocs() > 0 && MergeTaskUtils.allowMerge(segmentZKMetadata)) {
                        arrayList2.add(segmentZKMetadata);
                    }
                }
                if (arrayList2.isEmpty()) {
                    resetDelayMetrics(tableName);
                    LOGGER.info("Skip generating task: {} for table: {}, no segment is found.", "MergeRollupTask", tableName);
                } else {
                    arrayList2.sort((segmentZKMetadata2, segmentZKMetadata3) -> {
                        long startTimeMs = segmentZKMetadata2.getStartTimeMs();
                        long startTimeMs2 = segmentZKMetadata3.getStartTimeMs();
                        if (startTimeMs != startTimeMs2) {
                            return Long.compare(startTimeMs, startTimeMs2);
                        }
                        long endTimeMs = segmentZKMetadata2.getEndTimeMs();
                        long endTimeMs2 = segmentZKMetadata3.getEndTimeMs();
                        return endTimeMs != endTimeMs2 ? Long.compare(endTimeMs, endTimeMs2) : segmentZKMetadata2.getSegmentName().compareTo(segmentZKMetadata3.getSegmentName());
                    });
                    Map<String, String> configsForTaskType = tableConfig.getTaskConfig().getConfigsForTaskType("MergeRollupTask");
                    ArrayList<Map.Entry> arrayList3 = new ArrayList(MergeRollupTaskUtils.getLevelToConfigMap(configsForTaskType).entrySet());
                    arrayList3.sort(Comparator.comparingLong(entry -> {
                        return TimeUtils.convertPeriodToMillis((String) ((Map) entry.getValue()).get("bucketTimePeriod")).longValue();
                    }));
                    HashSet hashSet2 = new HashSet();
                    Iterator it2 = TaskGeneratorUtils.getIncompleteTasks("MergeRollupTask", tableName, this._clusterInfoAccessor).entrySet().iterator();
                    while (it2.hasNext()) {
                        Iterator it3 = this._clusterInfoAccessor.getTaskConfigs((String) ((Map.Entry) it2.next()).getKey()).iterator();
                        while (it3.hasNext()) {
                            hashSet2.add((String) ((PinotTaskConfig) it3.next()).getConfigs().get("mergeLevel"));
                        }
                    }
                    boolean equalsIgnoreCase = "processAll".equalsIgnoreCase(configsForTaskType.get("mode"));
                    ZNRecord minionTaskMetadataZNRecord = this._clusterInfoAccessor.getMinionTaskMetadataZNRecord("MergeRollupTask", tableName);
                    int version = minionTaskMetadataZNRecord != null ? minionTaskMetadataZNRecord.getVersion() : -1;
                    MergeRollupTaskMetadata fromZNRecord = minionTaskMetadataZNRecord != null ? MergeRollupTaskMetadata.fromZNRecord(minionTaskMetadataZNRecord) : new MergeRollupTaskMetadata(tableName, new TreeMap());
                    ArrayList arrayList4 = new ArrayList();
                    String str = null;
                    for (Map.Entry entry2 : arrayList3) {
                        String str2 = str;
                        str = (String) entry2.getKey();
                        Map<String, String> map = (Map) entry2.getValue();
                        if (hashSet2.contains(str)) {
                            LOGGER.info("Found incomplete task of merge level: {} for the same table: {}, Skipping task generation: {}", new Object[]{str, tableName, "MergeRollupTask"});
                        } else {
                            String str3 = map.get("bucketTimePeriod");
                            long longValue = TimeUtils.convertPeriodToMillis(str3).longValue();
                            if (longValue <= 0) {
                                LOGGER.error("Bucket time period: {} (table : {}, mergeLevel : {}) must be larger than 0", new Object[]{str3, tableName, str});
                            } else {
                                String str4 = map.get("bufferTimePeriod");
                                long longValue2 = TimeUtils.convertPeriodToMillis(str4).longValue();
                                if (longValue2 < 0) {
                                    LOGGER.error("Buffer time period: {} (table : {}, mergeLevel : {}) must be larger or equal to 0", new Object[]{str4, tableName, str});
                                } else {
                                    String str5 = map.get("maxNumParallelBuckets");
                                    int parseInt = str5 != null ? Integer.parseInt(str5) : DEFAULT_NUM_PARALLEL_BUCKETS;
                                    if (parseInt <= 0) {
                                        LOGGER.error("Maximum number of parallel buckets: {} (table : {}, mergeLevel : {}) must be larger than 0", new Object[]{Integer.valueOf(parseInt), tableName, str});
                                    } else {
                                        long startTimeMs = arrayList2.get(0).getStartTimeMs();
                                        long j = (startTimeMs / longValue) * longValue;
                                        long j2 = 0;
                                        if (!equalsIgnoreCase) {
                                            j2 = getWatermarkMs(startTimeMs, longValue, str, fromZNRecord);
                                            j = j2;
                                        }
                                        long j3 = j + longValue;
                                        if (str2 == null) {
                                            long j4 = Long.MIN_VALUE;
                                            Iterator<SegmentZKMetadata> it4 = arrayList2.iterator();
                                            while (it4.hasNext()) {
                                                j4 = Math.max(j4, getValidBucketEndTimeMsForSegment(it4.next(), longValue, longValue2));
                                            }
                                            this._tableLowestLevelMaxValidBucketEndTimeMs.put(tableName, Long.valueOf(j4));
                                        }
                                        List<String> list2 = (List) arrayList3.stream().map(entry3 -> {
                                            return (String) entry3.getKey();
                                        }).collect(Collectors.toList());
                                        if (equalsIgnoreCase) {
                                            createOrUpdateNumBucketsToProcessMetrics(tableName, str, str2, longValue2, longValue, arrayList2, list2);
                                        } else {
                                            createOrUpdateDelayMetrics(tableName, str, null, j2, longValue2, longValue);
                                        }
                                        if (isValidBucketEndTime(j3, longValue2, str2, fromZNRecord, equalsIgnoreCase)) {
                                            ArrayList<List> arrayList5 = new ArrayList(parseInt);
                                            ArrayList arrayList6 = new ArrayList();
                                            boolean z = false;
                                            boolean z2 = false;
                                            boolean z3 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                            for (SegmentZKMetadata segmentZKMetadata4 : arrayList2) {
                                                long startTimeMs2 = segmentZKMetadata4.getStartTimeMs();
                                                if (startTimeMs2 >= j3) {
                                                    if (z && z3) {
                                                        arrayList5.add(arrayList6);
                                                    }
                                                    if (arrayList5.size() == parseInt || z2) {
                                                        break;
                                                    }
                                                    arrayList6 = new ArrayList();
                                                    z3 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    j = (startTimeMs2 / longValue) * longValue;
                                                    j3 = j + longValue;
                                                    if (!isValidBucketEndTime(j3, longValue2, str2, fromZNRecord, equalsIgnoreCase)) {
                                                        break;
                                                    }
                                                    z = isMergedSegment(segmentZKMetadata4, str, list2) ? false : DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    if (!isMergedSegment(segmentZKMetadata4, str2, list2)) {
                                                        z3 = false;
                                                    }
                                                    if (hasSpilledOverData(segmentZKMetadata4, longValue)) {
                                                        z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    arrayList6.add(segmentZKMetadata4);
                                                } else if (segmentZKMetadata4.getEndTimeMs() >= j) {
                                                    if (!isMergedSegment(segmentZKMetadata4, str, list2)) {
                                                        z = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    if (!isMergedSegment(segmentZKMetadata4, str2, list2)) {
                                                        z3 = false;
                                                    }
                                                    if (hasSpilledOverData(segmentZKMetadata4, longValue)) {
                                                        z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
                                                    }
                                                    arrayList6.add(segmentZKMetadata4);
                                                }
                                            }
                                            if (z && z3 && (arrayList5.isEmpty() || arrayList5.get(arrayList5.size() - DEFAULT_NUM_PARALLEL_BUCKETS) != arrayList6)) {
                                                arrayList5.add(arrayList6);
                                            }
                                            if (arrayList5.isEmpty()) {
                                                LOGGER.info("No unmerged segment found for table: {}, mergeLevel: {}", tableName, str);
                                            } else {
                                                long startTimeMs3 = (((SegmentZKMetadata) ((List) arrayList5.get(0)).get(0)).getStartTimeMs() / longValue) * longValue;
                                                fromZNRecord.getWatermarkMap().put(str, Long.valueOf(startTimeMs3));
                                                LOGGER.info("Update watermark for table: {}, mergeLevel: {} from: {} to: {}", new Object[]{tableName, str, Long.valueOf(j2), Long.valueOf(startTimeMs3)});
                                                if (!equalsIgnoreCase) {
                                                    createOrUpdateDelayMetrics(tableName, str, str2, startTimeMs3, longValue2, longValue);
                                                }
                                                int parseInt2 = map.get("maxNumRecordsPerTask") != null ? Integer.parseInt(map.get("maxNumRecordsPerTask")) : DEFAULT_MAX_NUM_RECORDS_PER_TASK;
                                                SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
                                                if (segmentPartitionConfig == null) {
                                                    Iterator it5 = arrayList5.iterator();
                                                    while (it5.hasNext()) {
                                                        arrayList4.addAll(createPinotTaskConfigs((List) it5.next(), tableConfig, parseInt2, str, null, map, configsForTaskType));
                                                    }
                                                } else {
                                                    Map columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
                                                    ArrayList arrayList7 = new ArrayList(columnPartitionMap.keySet());
                                                    for (List<SegmentZKMetadata> list3 : arrayList5) {
                                                        HashMap hashMap = new HashMap();
                                                        ArrayList arrayList8 = new ArrayList();
                                                        for (SegmentZKMetadata segmentZKMetadata5 : list3) {
                                                            SegmentPartitionMetadata partitionMetadata = segmentZKMetadata5.getPartitionMetadata();
                                                            ArrayList arrayList9 = new ArrayList();
                                                            if (partitionMetadata != null && columnPartitionMap.keySet().equals(partitionMetadata.getColumnPartitionMap().keySet())) {
                                                                Iterator it6 = arrayList7.iterator();
                                                                while (true) {
                                                                    if (!it6.hasNext()) {
                                                                        break;
                                                                    }
                                                                    String str6 = (String) it6.next();
                                                                    if (partitionMetadata.getPartitions(str6).size() != DEFAULT_NUM_PARALLEL_BUCKETS) {
                                                                        arrayList9.clear();
                                                                        break;
                                                                    }
                                                                    arrayList9.add((Integer) partitionMetadata.getPartitions(str6).iterator().next());
                                                                }
                                                            }
                                                            if (arrayList9.isEmpty()) {
                                                                arrayList8.add(segmentZKMetadata5);
                                                            } else {
                                                                ((List) hashMap.computeIfAbsent(arrayList9, list4 -> {
                                                                    return new ArrayList();
                                                                })).add(segmentZKMetadata5);
                                                            }
                                                        }
                                                        for (Map.Entry entry4 : hashMap.entrySet()) {
                                                            arrayList4.addAll(createPinotTaskConfigs((List) entry4.getValue(), tableConfig, parseInt2, str, (List) entry4.getKey(), map, configsForTaskType));
                                                        }
                                                        if (!arrayList8.isEmpty()) {
                                                            arrayList4.addAll(createPinotTaskConfigs(arrayList8, tableConfig, parseInt2, str, null, map, configsForTaskType));
                                                        }
                                                    }
                                                }
                                            }
                                        } else {
                                            Logger logger = LOGGER;
                                            Object[] objArr = new Object[5];
                                            objArr[0] = Long.valueOf(j);
                                            objArr[DEFAULT_NUM_PARALLEL_BUCKETS] = Long.valueOf(j3);
                                            objArr[2] = tableName;
                                            objArr[3] = str;
                                            objArr[4] = equalsIgnoreCase ? "processAll" : "processFromWatermark";
                                            logger.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}, mode : {}) cannot be merged yet", objArr);
                                        }
                                    }
                                }
                            }
                        }
                    }
                    if (!equalsIgnoreCase) {
                        try {
                            this._clusterInfoAccessor.setMinionTaskMetadata(fromZNRecord, "MergeRollupTask", version);
                        } catch (ZkException e) {
                            LOGGER.error("Version changed while updating merge/rollup task metadata for table: {}, skip scheduling. There are multiple task schedulers for the same table, need to investigate!", tableName);
                        }
                    }
                    arrayList.addAll(arrayList4);
                    LOGGER.info("Finished generating task configs for table: {} for task: {}, numTasks: {}", new Object[]{tableName, "MergeRollupTask", Integer.valueOf(arrayList4.size())});
                }
            }
        }
        cleanUpDelayMetrics(list);
        return arrayList;
    }

    @VisibleForTesting
    static List<SegmentZKMetadata> filterSegmentsBasedOnStatus(TableType tableType, List<SegmentZKMetadata> list) {
        if (tableType != TableType.REALTIME) {
            return list;
        }
        long j = Long.MAX_VALUE;
        for (SegmentZKMetadata segmentZKMetadata : list) {
            if (!segmentZKMetadata.getStatus().isCompleted() && segmentZKMetadata.getTotalDocs() > 0 && segmentZKMetadata.getStartTimeMs() < j) {
                j = segmentZKMetadata.getStartTimeMs();
            }
        }
        long j2 = j;
        return (List) list.stream().filter(segmentZKMetadata2 -> {
            return segmentZKMetadata2.getStatus().isCompleted() && segmentZKMetadata2.getStartTimeMs() < j2;
        }).collect(Collectors.toList());
    }

    @VisibleForTesting
    static boolean validate(TableConfig tableConfig, String str) {
        String tableName = tableConfig.getTableName();
        if (REFRESH.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) {
            LOGGER.warn("Skip generating task: {} for non-APPEND table: {}, REFRESH table is not supported", str, tableName);
            return false;
        }
        if (tableConfig.getTableType() != TableType.REALTIME) {
            return true;
        }
        if (tableConfig.isUpsertEnabled()) {
            LOGGER.warn("Skip generating task: {} for table: {}, table with upsert enabled is not supported", str, tableName);
            return false;
        }
        if (!tableConfig.isDedupEnabled()) {
            return true;
        }
        LOGGER.warn("Skip generating task: {} for table: {}, table with dedup enabled is not supported", str, tableName);
        return false;
    }

    private long getValidBucketEndTimeMsForSegment(SegmentZKMetadata segmentZKMetadata, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        if (((segmentZKMetadata.getStartTimeMs() / j) * j) + j > currentTimeMillis - j2) {
            return Long.MIN_VALUE;
        }
        return Math.min(((segmentZKMetadata.getEndTimeMs() / j) + 1) * j, ((currentTimeMillis - j2) / j) * j);
    }

    private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long j) {
        return segmentZKMetadata.getStartTimeMs() / j < segmentZKMetadata.getEndTimeMs() / j;
    }

    private boolean isMergedSegment(SegmentZKMetadata segmentZKMetadata, String str, List<String> list) {
        Map customMap = segmentZKMetadata.getCustomMap();
        if (str == null) {
            return true;
        }
        if (customMap == null || customMap.get("MergeRollupTask.mergeLevel") == null) {
            return false;
        }
        String str2 = (String) customMap.get("MergeRollupTask.mergeLevel");
        boolean z = DEFAULT_NUM_PARALLEL_BUCKETS;
        for (String str3 : list) {
            if (str3.equalsIgnoreCase(str)) {
                z = false;
            }
            if (!z && str3.equalsIgnoreCase(str2)) {
                return true;
            }
        }
        return false;
    }

    private boolean isValidBucketEndTime(long j, long j2, @Nullable String str, MergeRollupTaskMetadata mergeRollupTaskMetadata, boolean z) {
        if (j > System.currentTimeMillis() - j2) {
            return false;
        }
        if (str == null || z) {
            return true;
        }
        Long l = (Long) mergeRollupTaskMetadata.getWatermarkMap().get(str);
        return l != null && j <= l.longValue();
    }

    private long getWatermarkMs(long j, long j2, String str, MergeRollupTaskMetadata mergeRollupTaskMetadata) {
        return mergeRollupTaskMetadata.getWatermarkMap().get(str) == null ? (j / j2) * j2 : ((Long) mergeRollupTaskMetadata.getWatermarkMap().get(str)).longValue();
    }

    private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> list, TableConfig tableConfig, int i, String str, List<Integer> list2, Map<String, String> map, Map<String, String> map2) {
        String tableName = tableConfig.getTableName();
        List<List<SegmentZKMetadata>> segmentGroups = MergeRollupTaskSegmentGroupManagerProvider.create(map2).getSegmentGroups(tableConfig, this._clusterInfoAccessor, list);
        ArrayList arrayList = new ArrayList();
        for (List<SegmentZKMetadata> list3 : segmentGroups) {
            int i2 = 0;
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            ArrayList arrayList4 = new ArrayList();
            ArrayList arrayList5 = new ArrayList();
            for (int i3 = 0; i3 < list3.size(); i3 += DEFAULT_NUM_PARALLEL_BUCKETS) {
                SegmentZKMetadata segmentZKMetadata = list3.get(i3);
                arrayList4.add(segmentZKMetadata.getSegmentName());
                arrayList5.add(segmentZKMetadata.getDownloadUrl());
                i2 = (int) (i2 + segmentZKMetadata.getTotalDocs());
                if (i2 >= i || i3 == list3.size() - DEFAULT_NUM_PARALLEL_BUCKETS) {
                    arrayList2.add(arrayList4);
                    arrayList3.add(arrayList5);
                    i2 = 0;
                    arrayList4 = new ArrayList();
                    arrayList5 = new ArrayList();
                }
            }
            StringBuilder sb = new StringBuilder();
            if (list2 != null && !list2.isEmpty()) {
                Iterator<Integer> it = list2.iterator();
                while (it.hasNext()) {
                    sb.append(DELIMITER_IN_SEGMENT_NAME).append(it.next().intValue());
                }
            }
            String sb2 = sb.toString();
            for (int i4 = 0; i4 < arrayList2.size(); i4 += DEFAULT_NUM_PARALLEL_BUCKETS) {
                String join = StringUtils.join((Iterable) arrayList3.get(i4), ",");
                Map<String, String> pushTaskConfig = MinionTaskUtils.getPushTaskConfig(tableName, map2, this._clusterInfoAccessor);
                pushTaskConfig.putAll(getBaseTaskConfigs(tableConfig, (List) arrayList2.get(i4)));
                pushTaskConfig.put("downloadURL", join);
                pushTaskConfig.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
                pushTaskConfig.put("enableReplaceSegments", "true");
                for (Map.Entry<String, String> entry : map2.entrySet()) {
                    if (entry.getKey().endsWith(".aggregationType")) {
                        pushTaskConfig.put(entry.getKey(), entry.getValue());
                    }
                }
                pushTaskConfig.put("overwriteOutput", map2.getOrDefault("overwriteOutput", "false"));
                pushTaskConfig.put("mergeType", map.get("mergeType"));
                pushTaskConfig.put("mergeLevel", str);
                pushTaskConfig.put("partitionBucketTimePeriod", map.get("bucketTimePeriod"));
                pushTaskConfig.put("roundBucketTimePeriod", map.get("roundBucketTimePeriod"));
                pushTaskConfig.put("maxNumRecordsPerSegment", map.get("maxNumRecordsPerSegment"));
                TableNameBuilder.extractRawTableName(tableName);
                pushTaskConfig.put("segmentNamePrefix", "merged_" + str + "_" + System.currentTimeMillis() + pushTaskConfig + "_" + sb2 + "_" + i4);
                arrayList.add(new PinotTaskConfig("MergeRollupTask", pushTaskConfig));
            }
        }
        return arrayList;
    }

    private long getMergeRollupTaskDelayInNumTimeBuckets(long j, long j2, long j3, long j4) {
        if (j == -1 || j2 == Long.MIN_VALUE) {
            return 0L;
        }
        return (Math.min(System.currentTimeMillis() - j3, j2) - j) / j4;
    }

    private void createOrUpdateDelayMetrics(String str, String str2, String str3, long j, long j2, long j3) {
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null) {
            return;
        }
        Map<String, Long> computeIfAbsent = this._mergeRollupWatermarks.computeIfAbsent(str, str4 -> {
            return new ConcurrentHashMap();
        });
        computeIfAbsent.compute(str2, (str5, l) -> {
            if (l == null) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[6];
                objArr[0] = str;
                objArr[DEFAULT_NUM_PARALLEL_BUCKETS] = str2;
                objArr[2] = Long.valueOf(j);
                objArr[3] = Long.valueOf(j2);
                objArr[4] = Long.valueOf(j3);
                objArr[5] = Long.valueOf(getMergeRollupTaskDelayInNumTimeBuckets(j, (str3 == null ? this._tableLowestLevelMaxValidBucketEndTimeMs.get(str) : (Long) computeIfAbsent.get(str3)).longValue(), j2, j3));
                logger.info("Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}.(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", objArr);
                controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(str, str2), () -> {
                    return Long.valueOf(getMergeRollupTaskDelayInNumTimeBuckets(((Long) computeIfAbsent.getOrDefault(str5, -1L)).longValue(), (str3 == null ? this._tableLowestLevelMaxValidBucketEndTimeMs.get(str) : (Long) computeIfAbsent.get(str3)).longValue(), j2, j3));
                });
            }
            return Long.valueOf(j);
        });
    }

    private void createOrUpdateNumBucketsToProcessMetrics(String str, String str2, String str3, long j, long j2, List<SegmentZKMetadata> list, List<String> list2) {
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        long startTimeMs = (list.get(0).getStartTimeMs() / j2) * j2;
        long j3 = startTimeMs + j2;
        boolean z = false;
        boolean z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
        for (SegmentZKMetadata segmentZKMetadata : list) {
            long startTimeMs2 = segmentZKMetadata.getStartTimeMs();
            if (startTimeMs2 >= j3) {
                if (z && z2) {
                    arrayList.add(arrayList2);
                }
                arrayList2 = new ArrayList();
                z2 = DEFAULT_NUM_PARALLEL_BUCKETS;
                startTimeMs = (startTimeMs2 / j2) * j2;
                j3 = startTimeMs + j2;
                if (j3 > System.currentTimeMillis() - j) {
                    break;
                }
                z = isMergedSegment(segmentZKMetadata, str2, list2) ? false : DEFAULT_NUM_PARALLEL_BUCKETS;
                if (!isMergedSegment(segmentZKMetadata, str3, list2)) {
                    z2 = false;
                }
                arrayList2.add(segmentZKMetadata);
            } else if (segmentZKMetadata.getEndTimeMs() >= startTimeMs) {
                if (!isMergedSegment(segmentZKMetadata, str2, list2)) {
                    z = DEFAULT_NUM_PARALLEL_BUCKETS;
                }
                if (!isMergedSegment(segmentZKMetadata, str3, list2)) {
                    z2 = false;
                }
                arrayList2.add(segmentZKMetadata);
            }
        }
        if (z && z2 && (arrayList.isEmpty() || arrayList.get(arrayList.size() - DEFAULT_NUM_PARALLEL_BUCKETS) != arrayList2)) {
            arrayList.add(arrayList2);
        }
        Map<String, Long> computeIfAbsent = this._tableNumberBucketsToProcess.computeIfAbsent(str, str4 -> {
            return new ConcurrentHashMap();
        });
        long size = arrayList.size();
        computeIfAbsent.compute(str2, (str5, l) -> {
            if (l == null) {
                LOGGER.info("Creating the gauge metric for tracking the merge/roll-up number buckets to process for table: {} and mergeLevel: {}.(bufferTimeMs={}, bucketTimeMs={}, numTimeBucketsToProcess={})", new Object[]{str, str2, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(size)});
                controllerMetrics.setOrUpdateGauge(getMetricNameForNumBucketsToProcess(str, str2), () -> {
                    return this._tableNumberBucketsToProcess.get(str).getOrDefault(str2, Long.valueOf(size));
                });
            }
            return Long.valueOf(size);
        });
    }

    private void resetDelayMetrics(String str) {
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null) {
            return;
        }
        Map<String, Long> remove = this._mergeRollupWatermarks.remove(str);
        if (remove != null) {
            Iterator<String> it = remove.keySet().iterator();
            while (it.hasNext()) {
                controllerMetrics.removeGauge(getMetricNameForTaskDelay(str, it.next()));
            }
        }
        Map<String, Long> remove2 = this._tableNumberBucketsToProcess.remove(str);
        if (remove2 != null) {
            Iterator<String> it2 = remove2.keySet().iterator();
            while (it2.hasNext()) {
                controllerMetrics.removeGauge(getMetricNameForNumBucketsToProcess(str, it2.next()));
            }
        }
    }

    private void resetDelayMetrics(String str, String str2) {
        ControllerMetrics controllerMetrics = this._clusterInfoAccessor.getControllerMetrics();
        if (controllerMetrics == null) {
            return;
        }
        Map<String, Long> map = this._mergeRollupWatermarks.get(str);
        if (map != null && map.remove(str2) != null) {
            controllerMetrics.removeGauge(getMetricNameForTaskDelay(str, str2));
        }
        Map<String, Long> remove = this._tableNumberBucketsToProcess.remove(str);
        if (remove == null || remove.remove(str2) == null) {
            return;
        }
        controllerMetrics.removeGauge(getMetricNameForNumBucketsToProcess(str, str2));
    }

    private void cleanUpDelayMetrics(List<TableConfig> list) {
        HashMap hashMap = new HashMap();
        for (TableConfig tableConfig : list) {
            hashMap.put(tableConfig.getTableName(), tableConfig);
        }
        HashSet<String> hashSet = new HashSet(this._mergeRollupWatermarks.keySet());
        hashSet.addAll(this._tableNumberBucketsToProcess.keySet());
        for (String str : hashSet) {
            TableConfig tableConfig2 = (TableConfig) hashMap.get(str);
            if (tableConfig2 == null) {
                resetDelayMetrics(str);
            } else if (this._clusterInfoAccessor.getLeaderControllerManager().isLeaderForTable(str)) {
                Map<String, Map<String, String>> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(tableConfig2.getTaskConfig().getConfigsForTaskType(getTaskType()));
                Map<String, Long> map = this._mergeRollupWatermarks.get(str);
                if (map != null) {
                    for (String str2 : map.keySet()) {
                        if (!levelToConfigMap.containsKey(str2)) {
                            resetDelayMetrics(str, str2);
                        }
                    }
                }
                Map<String, Long> map2 = this._tableNumberBucketsToProcess.get(str);
                if (map2 != null) {
                    for (String str3 : map2.keySet()) {
                        if (!levelToConfigMap.containsKey(str3)) {
                            resetDelayMetrics(str, str3);
                        }
                    }
                }
            } else {
                resetDelayMetrics(str);
            }
        }
    }

    private String getMetricNameForTaskDelay(String str, String str2) {
        return "mergeRollupTaskDelayInNumBuckets." + str + "." + str2;
    }

    private String getMetricNameForNumBucketsToProcess(String str, String str2) {
        return "mergeRollupTaskNumBucketsToProcess." + str + "." + str2;
    }
}
