package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.class */
public class UpsertCompactMergeTaskGenerator extends BaseTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskGenerator.class);
    private static final String DEFAULT_BUFFER_PERIOD = "2d";
    private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;

    /* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator$SegmentMergerMetadata.class */
    public static class SegmentMergerMetadata {
        private final SegmentZKMetadata _segmentZKMetadata;
        private final long _validDocIds;
        private final long _invalidDocIds;
        private final double _segmentSizeInBytes;

        SegmentMergerMetadata(SegmentZKMetadata segmentZKMetadata, long j, long j2, double d) {
            this._segmentZKMetadata = segmentZKMetadata;
            this._validDocIds = j;
            this._invalidDocIds = j2;
            this._segmentSizeInBytes = d;
        }

        public SegmentZKMetadata getSegmentZKMetadata() {
            return this._segmentZKMetadata;
        }

        public long getValidDocIds() {
            return this._validDocIds;
        }

        public long getInvalidDocIds() {
            return this._invalidDocIds;
        }

        public double getSegmentSizeInBytes() {
            return this._segmentSizeInBytes;
        }
    }

    /* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator$SegmentSelectionResult.class */
    public static class SegmentSelectionResult {
        private final Map<Integer, List<List<SegmentMergerMetadata>>> _segmentsForCompactMergeByPartition;
        private final List<String> _segmentsForDeletion;

        SegmentSelectionResult(Map<Integer, List<List<SegmentMergerMetadata>>> map, List<String> list) {
            this._segmentsForCompactMergeByPartition = map;
            this._segmentsForDeletion = list;
        }

        public Map<Integer, List<List<SegmentMergerMetadata>>> getSegmentsForCompactMergeByPartition() {
            return this._segmentsForCompactMergeByPartition;
        }

        public List<String> getSegmentsForDeletion() {
            return this._segmentsForDeletion;
        }
    }

    public String getTaskType() {
        return "UpsertCompactMergeTask";
    }

    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            String tableName = tableConfig.getTableName();
            LOGGER.info("Start generating task configs for table: {}", tableName);
            if (tableConfig.getTaskConfig() == null) {
                LOGGER.warn("Task config is null for table: {}", tableName);
            } else {
                Map incompleteTasks = TaskGeneratorUtils.getIncompleteTasks("UpsertCompactMergeTask", tableName, this._clusterInfoAccessor);
                if (incompleteTasks.isEmpty()) {
                    Map configsForTaskType = tableConfig.getTaskConfig().getConfigsForTaskType("UpsertCompactMergeTask");
                    List segmentsZKMetadata = this._clusterInfoAccessor.getSegmentsZKMetadata(tableName);
                    List<SegmentZKMetadata> candidateSegments = getCandidateSegments(configsForTaskType, segmentsZKMetadata, System.currentTimeMillis());
                    if (candidateSegments.isEmpty()) {
                        LOGGER.info("No segments were eligible for compactMerge task for table: {}", tableName);
                    } else {
                        PinotHelixResourceManager pinotHelixResourceManager = this._clusterInfoAccessor.getPinotHelixResourceManager();
                        Map serverToSegmentsMap = pinotHelixResourceManager.getServerToSegmentsMap(tableName);
                        try {
                            SegmentSelectionResult processValidDocIdsMetadata = processValidDocIdsMetadata(tableName, configsForTaskType, (Map) candidateSegments.stream().collect(Collectors.toMap((v0) -> {
                                return v0.getSegmentName();
                            }, Function.identity())), new ServerSegmentMetadataReader(this._clusterInfoAccessor.getExecutor(), this._clusterInfoAccessor.getConnectionManager()).getSegmentToValidDocIdsMetadataFromServer(tableName, serverToSegmentsMap, pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet()), (List) null, 60000, ValidDocIdsType.SNAPSHOT.toString(), Integer.parseInt((String) configsForTaskType.getOrDefault("numSegmentsBatchPerServerRequest", String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)))), getAlreadyMergedSegments(segmentsZKMetadata));
                            if (!processValidDocIdsMetadata.getSegmentsForDeletion().isEmpty()) {
                                pinotHelixResourceManager.deleteSegments(tableName, processValidDocIdsMetadata.getSegmentsForDeletion(), "0d");
                                LOGGER.info("Deleted segments containing only invalid records for table: {}, number of segments to be deleted: {}", tableName, processValidDocIdsMetadata.getSegmentsForDeletion());
                            }
                            int i = 0;
                            int parseInt = Integer.parseInt((String) configsForTaskType.getOrDefault("tableMaxNumTasks", String.valueOf(1L)));
                            for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry : processValidDocIdsMetadata.getSegmentsForCompactMergeByPartition().entrySet()) {
                                if (i == parseInt) {
                                    break;
                                }
                                List<List<SegmentMergerMetadata>> value = entry.getValue();
                                if (!value.isEmpty() && value.get(0).size() > 1) {
                                    HashMap hashMap = new HashMap(getBaseTaskConfigs(tableConfig, (List) value.get(0).stream().map(segmentMergerMetadata -> {
                                        return segmentMergerMetadata.getSegmentZKMetadata().getSegmentName();
                                    }).collect(Collectors.toList())));
                                    hashMap.put("downloadURL", getDownloadUrl(value.get(0)));
                                    hashMap.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
                                    hashMap.put("crc", getSegmentCrcList(value.get(0)));
                                    hashMap.put("maxNumRecordsPerSegment", String.valueOf(Long.parseLong((String) configsForTaskType.getOrDefault("maxNumRecordsPerSegment", String.valueOf(5000000L)))));
                                    arrayList.add(new PinotTaskConfig("UpsertCompactMergeTask", hashMap));
                                    i++;
                                }
                            }
                            LOGGER.info("Finished generating {} tasks configs for table: {}", Integer.valueOf(i), tableName);
                        } catch (InvalidConfigException e) {
                            throw new RuntimeException((Throwable) e);
                        }
                    }
                } else {
                    LOGGER.warn("Found incomplete tasks: {} for same table: {} and task type: {}. Skipping task generation.", new Object[]{incompleteTasks.keySet(), tableName, "UpsertCompactMergeTask"});
                }
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    public static SegmentSelectionResult processValidDocIdsMetadata(String str, Map<String, String> map, Map<String, SegmentZKMetadata> map2, Map<String, List<ValidDocIdsMetadataInfo>> map3, Set<String> set) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        long parseLong = Long.parseLong(map.getOrDefault("maxNumRecordsPerSegment", String.valueOf(5000000L)));
        long parseLong2 = Long.parseLong(map.getOrDefault("maxNumRecordsPerTask", String.valueOf(50000000L)));
        long parseLong3 = Long.parseLong(map.getOrDefault("maxNumSegmentsPerTask", String.valueOf(10L)));
        long j = Long.MAX_VALUE;
        try {
            if (map.containsKey("outputSegmentMaxSize")) {
                String str2 = map.get("outputSegmentMaxSize");
                LOGGER.info("Configured outputSegmentMaxSizeInByte: {} for {}", str2, str);
                j = DataSizeUtils.toBytes(str2);
            } else {
                LOGGER.info("No configured outputSegmentMaxSizeInByte for {}, defaulting to Long.MAX_VALUE", str);
            }
        } catch (Exception e) {
            LOGGER.warn("Invalid value outputSegmentMaxSizeInBytes configured for {}, defaulting to Long.MAX_VALUE", str, e);
        }
        for (String str3 : map3.keySet()) {
            if (map2.containsKey(str3)) {
                SegmentZKMetadata segmentZKMetadata = map2.get(str3);
                Iterator<ValidDocIdsMetadataInfo> it = map3.get(str3).iterator();
                while (true) {
                    if (it.hasNext()) {
                        ValidDocIdsMetadataInfo next = it.next();
                        long totalInvalidDocs = next.getTotalInvalidDocs();
                        long totalValidDocs = next.getTotalValidDocs();
                        long segmentSizeInBytes = next.getSegmentSizeInBytes();
                        if (segmentZKMetadata.getCrc() == Long.parseLong(next.getSegmentCrc())) {
                            long totalDocs = next.getTotalDocs();
                            if (totalInvalidDocs == totalDocs) {
                                hashSet.add(str3);
                                break;
                            }
                            if (set.contains(str3)) {
                                LOGGER.debug("Segment {} already merged. Skipping it for {}", str3, "UpsertCompactMergeTask");
                                break;
                            }
                            Integer partitionIdFromRealtimeSegmentName = SegmentUtils.getPartitionIdFromRealtimeSegmentName(str3);
                            if (partitionIdFromRealtimeSegmentName != null) {
                                ((List) hashMap.computeIfAbsent(partitionIdFromRealtimeSegmentName, num -> {
                                    return new ArrayList();
                                })).add(new SegmentMergerMetadata(segmentZKMetadata, totalValidDocs, totalInvalidDocs, ((segmentSizeInBytes * totalValidDocs) * 1.0d) / totalDocs));
                                break;
                            }
                            LOGGER.warn("Partition ID not found for segment: {}, skipping it for {}", str3, "UpsertCompactMergeTask");
                        } else {
                            LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", new Object[]{str3, Long.valueOf(segmentZKMetadata.getCrc()), next.getSegmentCrc()});
                        }
                    }
                }
            } else {
                LOGGER.debug("Segment {} is not found in the candidate segments list, skipping it for {}", str3, "UpsertCompactMergeTask");
            }
        }
        hashMap.forEach((num2, list) -> {
            list.sort(Comparator.comparingLong(segmentMergerMetadata -> {
                return segmentMergerMetadata.getSegmentZKMetadata().getCreationTime();
            }));
        });
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            int intValue = ((Integer) entry.getKey()).intValue();
            List<SegmentMergerMetadata> list2 = (List) entry.getValue();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            long j2 = 0;
            long j3 = 0;
            double d = 0.0d;
            for (SegmentMergerMetadata segmentMergerMetadata : list2) {
                long validDocIds = segmentMergerMetadata.getValidDocIds();
                long invalidDocIds = segmentMergerMetadata.getInvalidDocIds();
                double segmentSizeInBytes2 = segmentMergerMetadata.getSegmentSizeInBytes();
                if (j2 + validDocIds > parseLong || arrayList2.size() >= parseLong3 || j3 + validDocIds + invalidDocIds >= parseLong2 || d + segmentSizeInBytes2 >= j) {
                    if (!arrayList2.isEmpty()) {
                        arrayList.add(new ArrayList(arrayList2));
                    }
                    arrayList2 = new ArrayList();
                    arrayList2.add(segmentMergerMetadata);
                    j2 = validDocIds;
                    j3 = validDocIds + invalidDocIds;
                    d = segmentSizeInBytes2;
                } else {
                    arrayList2.add(segmentMergerMetadata);
                    j2 += validDocIds;
                    j3 += validDocIds + invalidDocIds;
                    d += segmentSizeInBytes2;
                }
            }
            if (!arrayList2.isEmpty()) {
                arrayList.add(new ArrayList(arrayList2));
            }
            List list3 = (List) arrayList.stream().filter(list4 -> {
                return list4.size() > 1;
            }).sorted((list5, list6) -> {
                long sum = list5.stream().mapToLong((v0) -> {
                    return v0.getInvalidDocIds();
                }).sum();
                long sum2 = list6.stream().mapToLong((v0) -> {
                    return v0.getInvalidDocIds();
                }).sum();
                if (sum2 < sum) {
                    return -1;
                }
                if (sum2 == sum) {
                    return Long.compare(list6.size(), list5.size());
                }
                return 1;
            }).collect(Collectors.toList());
            if (!list3.isEmpty()) {
                hashMap2.put(Integer.valueOf(intValue), list3);
            }
        }
        return new SegmentSelectionResult(hashMap2, new ArrayList(hashSet));
    }

    @VisibleForTesting
    public static List<SegmentZKMetadata> getCandidateSegments(Map<String, String> map, List<SegmentZKMetadata> list, long j) {
        ArrayList arrayList = new ArrayList();
        long longValue = TimeUtils.convertPeriodToMillis(map.getOrDefault("bufferTimePeriod", DEFAULT_BUFFER_PERIOD)).longValue();
        for (SegmentZKMetadata segmentZKMetadata : list) {
            if (StringUtils.isBlank(segmentZKMetadata.getDownloadUrl())) {
                LOGGER.warn("Skipping segment {} for task as download url is empty", segmentZKMetadata.getSegmentName());
            } else if (segmentZKMetadata.getStatus().isCompleted() && segmentZKMetadata.getEndTimeMs() <= j - longValue) {
                arrayList.add(segmentZKMetadata);
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    protected static Set<String> getAlreadyMergedSegments(List<SegmentZKMetadata> list) {
        HashSet hashSet = new HashSet();
        for (SegmentZKMetadata segmentZKMetadata : list) {
            if (segmentZKMetadata.getCustomMap() != null && !segmentZKMetadata.getCustomMap().isEmpty() && !StringUtils.isBlank((CharSequence) segmentZKMetadata.getCustomMap().get("UpsertCompactMergeTask.mergedSegments"))) {
                hashSet.addAll(List.of((Object[]) StringUtils.split((String) segmentZKMetadata.getCustomMap().get("UpsertCompactMergeTask.mergedSegments"), ",")));
            }
        }
        return hashSet;
    }

    public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> map) {
        Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, String.format("%s only supports realtime tables!", "UpsertCompactMergeTask"));
        Preconditions.checkState(tableConfig.isUpsertEnabled(), String.format("Upsert must be enabled for %s", "UpsertCompactMergeTask"));
        if (map.containsKey("bufferTimePeriod")) {
            TimeUtils.convertPeriodToMillis(map.get("bufferTimePeriod"));
        }
        UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
        Preconditions.checkNotNull(upsertConfig, String.format("UpsertConfig must be provided for %s", "UpsertCompactMergeTask"));
        Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format("'enableSnapshot' from UpsertConfig must be enabled for %s", "UpsertCompactMergeTask"));
        if (map.containsKey("outputSegmentMaxSize")) {
            DataSizeUtils.toBytes(map.get("outputSegmentMaxSize"));
        }
    }

    @VisibleForTesting
    protected String getDownloadUrl(List<SegmentMergerMetadata> list) {
        return StringUtils.join((Iterable) list.stream().map(segmentMergerMetadata -> {
            return segmentMergerMetadata.getSegmentZKMetadata().getDownloadUrl();
        }).collect(Collectors.toList()), ",");
    }

    @VisibleForTesting
    protected String getSegmentCrcList(List<SegmentMergerMetadata> list) {
        return StringUtils.join((Iterable) list.stream().map(segmentMergerMetadata -> {
            return String.valueOf(segmentMergerMetadata.getSegmentZKMetadata().getCrc());
        }).collect(Collectors.toList()), ",");
    }
}
