package org.apache.pinot.plugin.minion.tasks.upsertcompaction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.class */
public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class);
    private static final String DEFAULT_BUFFER_PERIOD = "7d";
    private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0d;
    private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
    private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;

    /* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator$SegmentSelectionResult.class */
    public static class SegmentSelectionResult {
        private final List<SegmentZKMetadata> _segmentsForCompaction;
        private final List<String> _segmentsForDeletion;

        SegmentSelectionResult(List<SegmentZKMetadata> list, List<String> list2) {
            this._segmentsForCompaction = list;
            this._segmentsForDeletion = list2;
        }

        public List<SegmentZKMetadata> getSegmentsForCompaction() {
            return this._segmentsForCompaction;
        }

        public List<String> getSegmentsForDeletion() {
            return this._segmentsForDeletion;
        }
    }

    public String getTaskType() {
        return "UpsertCompactionTask";
    }

    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            String tableName = tableConfig.getTableName();
            LOGGER.info("Start generating task configs for table: {}", tableName);
            if (tableConfig.getTaskConfig() == null) {
                LOGGER.warn("Task config is null for table: {}", tableName);
            } else {
                Map configsForTaskType = tableConfig.getTaskConfig().getConfigsForTaskType("UpsertCompactionTask");
                List<SegmentZKMetadata> completedSegments = getCompletedSegments(configsForTaskType, this._clusterInfoAccessor.getSegmentsZKMetadata(tableName), System.currentTimeMillis());
                if (completedSegments.isEmpty()) {
                    LOGGER.info("No completed segments were eligible for compaction for table: {}", tableName);
                } else {
                    Map incompleteTasks = TaskGeneratorUtils.getIncompleteTasks("UpsertCompactionTask", tableName, this._clusterInfoAccessor);
                    if (incompleteTasks.isEmpty()) {
                        PinotHelixResourceManager pinotHelixResourceManager = this._clusterInfoAccessor.getPinotHelixResourceManager();
                        Map serverToSegmentsMap = pinotHelixResourceManager.getServerToSegmentsMap(tableName);
                        try {
                            BiMap dataInstanceAdminEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
                            ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader(this._clusterInfoAccessor.getExecutor(), this._clusterInfoAccessor.getConnectionManager());
                            ValidDocIdsType valueOf = ValidDocIdsType.valueOf(((String) configsForTaskType.getOrDefault("validDocIdsType", ValidDocIdsType.SNAPSHOT.toString())).toUpperCase());
                            Map segmentToValidDocIdsMetadataFromServer = serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableName, serverToSegmentsMap, dataInstanceAdminEndpoints, (List) null, 60000, valueOf.toString(), Integer.parseInt((String) configsForTaskType.getOrDefault("numSegmentsBatchPerServerRequest", String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST))));
                            SegmentSelectionResult processValidDocIdsMetadata = processValidDocIdsMetadata(configsForTaskType, (Map) completedSegments.stream().collect(Collectors.toMap((v0) -> {
                                return v0.getSegmentName();
                            }, Function.identity())), segmentToValidDocIdsMetadataFromServer);
                            LOGGER.info("Selected {} segments for compaction, {} segments for deletion and skipped {} segments for table: {}", new Object[]{Integer.valueOf(processValidDocIdsMetadata.getSegmentsForCompaction().size()), Integer.valueOf(processValidDocIdsMetadata.getSegmentsForDeletion().size()), Integer.valueOf((segmentToValidDocIdsMetadataFromServer.size() - processValidDocIdsMetadata.getSegmentsForCompaction().size()) - processValidDocIdsMetadata.getSegmentsForDeletion().size()), tableName});
                            if (!processValidDocIdsMetadata.getSegmentsForDeletion().isEmpty()) {
                                pinotHelixResourceManager.deleteSegments(tableName, processValidDocIdsMetadata.getSegmentsForDeletion(), "0d");
                                LOGGER.info("Deleted segments containing only invalid records for table: {}, number of segments to be deleted: {}", tableName, processValidDocIdsMetadata.getSegmentsForDeletion());
                            }
                            int i = 0;
                            int maxTasks = getMaxTasks("UpsertCompactionTask", tableName, configsForTaskType);
                            for (SegmentZKMetadata segmentZKMetadata : processValidDocIdsMetadata.getSegmentsForCompaction()) {
                                if (i == maxTasks) {
                                    break;
                                }
                                if (StringUtils.isBlank(segmentZKMetadata.getDownloadUrl())) {
                                    LOGGER.warn("Skipping segment {} for task {} as download url is empty", segmentZKMetadata.getSegmentName(), "UpsertCompactionTask");
                                } else {
                                    HashMap hashMap = new HashMap(getBaseTaskConfigs(tableConfig, List.of(segmentZKMetadata.getSegmentName())));
                                    hashMap.put("downloadURL", segmentZKMetadata.getDownloadUrl());
                                    hashMap.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
                                    hashMap.put("crc", String.valueOf(segmentZKMetadata.getCrc()));
                                    hashMap.put("validDocIdsType", valueOf.toString());
                                    hashMap.put("ignoreCrcMismatch", (String) configsForTaskType.getOrDefault("ignoreCrcMismatch", String.valueOf(false)));
                                    arrayList.add(new PinotTaskConfig("UpsertCompactionTask", hashMap));
                                    i++;
                                }
                            }
                            LOGGER.info("Finished generating {} tasks configs for table: {}", Integer.valueOf(i), tableName);
                        } catch (InvalidConfigException e) {
                            throw new RuntimeException((Throwable) e);
                        }
                    } else {
                        LOGGER.warn("Found incomplete tasks: {} for same table: {} and task type: {}. Skipping task generation.", new Object[]{incompleteTasks.keySet(), tableName, "UpsertCompactionTask"});
                    }
                }
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, String> map, Map<String, SegmentZKMetadata> map2, Map<String, List<ValidDocIdsMetadataInfo>> map3) {
        double parseDouble = Double.parseDouble(map.getOrDefault("invalidRecordsThresholdPercent", String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
        long parseLong = Long.parseLong(map.getOrDefault("invalidRecordsThresholdCount", String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (String str : map3.keySet()) {
            if (map2.containsKey(str)) {
                SegmentZKMetadata segmentZKMetadata = map2.get(str);
                Iterator<ValidDocIdsMetadataInfo> it = map3.get(str).iterator();
                while (true) {
                    if (it.hasNext()) {
                        ValidDocIdsMetadataInfo next = it.next();
                        long totalInvalidDocs = next.getTotalInvalidDocs();
                        if (segmentZKMetadata.getCrc() != Long.parseLong(next.getSegmentCrc())) {
                            LOGGER.warn("CRC mismatch for segment: {}, (segmentZKMetadata={}, validDocIdsMetadata={})", new Object[]{str, Long.valueOf(segmentZKMetadata.getCrc()), next.getSegmentCrc()});
                        } else {
                            long totalDocs = next.getTotalDocs();
                            double d = (totalInvalidDocs / totalDocs) * 100.0d;
                            if (totalInvalidDocs == totalDocs) {
                                LOGGER.debug("Segment {} contains only invalid records, adding it to the deletion list", str);
                                arrayList2.add(segmentZKMetadata.getSegmentName());
                            } else if (d < parseDouble || totalInvalidDocs < parseLong) {
                                LOGGER.debug("Segment {} contains {} invalid records out of {} total records (count threshold: {}, percent threshold: {}), skipping it for compaction", new Object[]{str, Long.valueOf(totalInvalidDocs), Long.valueOf(totalDocs), Long.valueOf(parseLong), Double.valueOf(parseDouble)});
                            } else {
                                LOGGER.debug("Segment {} contains {} invalid records out of {} total records (count threshold: {}, percent threshold: {}), adding it to the compaction list", new Object[]{str, Long.valueOf(totalInvalidDocs), Long.valueOf(totalDocs), Long.valueOf(parseLong), Double.valueOf(parseDouble)});
                                arrayList.add(Pair.of(segmentZKMetadata, Long.valueOf(totalInvalidDocs)));
                            }
                        }
                    }
                }
            } else {
                LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", str);
            }
        }
        arrayList.sort((pair, pair2) -> {
            if (((Long) pair.getValue()).longValue() > ((Long) pair2.getValue()).longValue()) {
                return -1;
            }
            return ((Long) pair.getValue()).equals(pair2.getValue()) ? 0 : 1;
        });
        return new SegmentSelectionResult((List) arrayList.stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()), arrayList2);
    }

    @VisibleForTesting
    public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> map, List<SegmentZKMetadata> list, long j) {
        ArrayList arrayList = new ArrayList();
        long longValue = TimeUtils.convertPeriodToMillis(map.getOrDefault("bufferTimePeriod", DEFAULT_BUFFER_PERIOD)).longValue();
        for (SegmentZKMetadata segmentZKMetadata : list) {
            if (segmentZKMetadata.getStatus().isCompleted() && segmentZKMetadata.getEndTimeMs() <= j - longValue) {
                arrayList.add(segmentZKMetadata);
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    public static int getMaxTasks(String str, String str2, Map<String, String> map) {
        int i = Integer.MAX_VALUE;
        String str3 = map.get("tableMaxNumTasks");
        if (str3 != null) {
            try {
                i = Integer.parseInt(str3);
            } catch (Exception e) {
                LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", str2, str);
            }
        }
        return i;
    }

    public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map<String, String> map) {
        Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, "UpsertCompactionTask only supports realtime tables!");
        Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask");
        if (map.containsKey("bufferTimePeriod")) {
            TimeUtils.convertPeriodToMillis(map.get("bufferTimePeriod"));
        }
        if (map.containsKey("invalidRecordsThresholdPercent")) {
            Preconditions.checkState(Double.parseDouble(map.get("invalidRecordsThresholdPercent")) >= DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT && Double.parseDouble(map.get("invalidRecordsThresholdPercent")) <= 100.0d, "invalidRecordsThresholdPercent must be >= 0 and <= 100");
        }
        if (map.containsKey("invalidRecordsThresholdCount")) {
            Preconditions.checkState(Long.parseLong(map.get("invalidRecordsThresholdCount")) >= DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT, "invalidRecordsThresholdCount must be >= 1");
        }
        Preconditions.checkState(map.containsKey("invalidRecordsThresholdPercent") || map.containsKey("invalidRecordsThresholdCount"), "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided");
        String orDefault = map.getOrDefault("validDocIdsType", "snapshot");
        if (orDefault.equals(ValidDocIdsType.SNAPSHOT.toString())) {
            UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
            Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
            Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format("'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = %s", orDefault));
        } else if (orDefault.equals(ValidDocIdsType.IN_MEMORY_WITH_DELETE.toString())) {
            UpsertConfig upsertConfig2 = tableConfig.getUpsertConfig();
            Preconditions.checkNotNull(upsertConfig2, "UpsertConfig must be provided for UpsertCompactionTask");
            Preconditions.checkNotNull(upsertConfig2.getDeleteRecordColumn(), String.format("deleteRecordColumn must be provided for UpsertCompactionTask with validDocIdsType = %s", orDefault));
        }
    }
}
