package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.class */
public class RealtimeToOfflineSegmentsTaskGenerator extends BaseTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
    private static final String DEFAULT_BUCKET_PERIOD = "1d";
    private static final String DEFAULT_BUFFER_PERIOD = "2d";

    public String getTaskType() {
        return "RealtimeToOfflineSegmentsTask";
    }

    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            String tableName = tableConfig.getTableName();
            if (tableConfig.getTableType() != TableType.REALTIME) {
                LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}", "RealtimeToOfflineSegmentsTask", tableName);
            } else if (new StreamConfig(tableName, IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType()) {
                LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}", "RealtimeToOfflineSegmentsTask", tableName);
            } else {
                LOGGER.info("Start generating task configs for table: {} for task: {}", tableName, "RealtimeToOfflineSegmentsTask");
                Map incompleteTasks = TaskGeneratorUtils.getIncompleteTasks("RealtimeToOfflineSegmentsTask", tableName, this._clusterInfoAccessor);
                if (incompleteTasks.isEmpty()) {
                    ArrayList arrayList2 = new ArrayList();
                    HashMap hashMap = new HashMap();
                    HashSet hashSet = new HashSet();
                    getCompletedSegmentsInfo(tableName, arrayList2, hashMap, hashSet);
                    if (arrayList2.isEmpty()) {
                        LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}", tableName, "RealtimeToOfflineSegmentsTask");
                    } else {
                        hashSet.removeAll(hashMap.keySet());
                        if (hashSet.isEmpty()) {
                            TableTaskConfig taskConfig = tableConfig.getTaskConfig();
                            Preconditions.checkState(taskConfig != null);
                            Map configsForTaskType = taskConfig.getConfigsForTaskType("RealtimeToOfflineSegmentsTask");
                            Preconditions.checkState(configsForTaskType != null, "Task config shouldn't be null for table: {}", tableName);
                            String str = (String) configsForTaskType.getOrDefault("bucketTimePeriod", DEFAULT_BUCKET_PERIOD);
                            String str2 = (String) configsForTaskType.getOrDefault("bufferTimePeriod", DEFAULT_BUFFER_PERIOD);
                            long longValue = TimeUtils.convertPeriodToMillis(str).longValue();
                            long longValue2 = TimeUtils.convertPeriodToMillis(str2).longValue();
                            long watermarkMs = getWatermarkMs(tableName, arrayList2, longValue);
                            long j = watermarkMs + longValue;
                            ArrayList arrayList3 = new ArrayList();
                            ArrayList arrayList4 = new ArrayList();
                            HashSet hashSet2 = new HashSet(hashMap.values());
                            boolean z = false;
                            while (true) {
                                if (j > System.currentTimeMillis() - longValue2) {
                                    LOGGER.info("Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task generation: {}", new Object[]{Long.valueOf(watermarkMs), Long.valueOf(j), Long.valueOf(longValue2), str2, "RealtimeToOfflineSegmentsTask"});
                                    z = true;
                                    break;
                                }
                                Iterator<SegmentZKMetadata> it = arrayList2.iterator();
                                while (true) {
                                    if (!it.hasNext()) {
                                        break;
                                    }
                                    SegmentZKMetadata next = it.next();
                                    String segmentName = next.getSegmentName();
                                    long startTimeMs = next.getStartTimeMs();
                                    long endTimeMs = next.getEndTimeMs();
                                    if (watermarkMs <= endTimeMs && startTimeMs < j) {
                                        if (hashSet2.contains(segmentName) && endTimeMs < j) {
                                            LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task generation: {}", segmentName, "RealtimeToOfflineSegmentsTask");
                                            z = true;
                                            break;
                                        }
                                        arrayList3.add(segmentName);
                                        arrayList4.add(next.getDownloadUrl());
                                    }
                                }
                                if (z || !arrayList3.isEmpty()) {
                                    break;
                                }
                                LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket", new Object[]{"RealtimeToOfflineSegmentsTask", Long.valueOf(watermarkMs), Long.valueOf(j)});
                                watermarkMs = j;
                                j += longValue;
                            }
                            if (!z) {
                                Map<String, String> pushTaskConfig = MinionTaskUtils.getPushTaskConfig(tableName, configsForTaskType, this._clusterInfoAccessor);
                                pushTaskConfig.put("tableName", tableName);
                                pushTaskConfig.put("segmentName", StringUtils.join(arrayList3, ","));
                                pushTaskConfig.put("downloadURL", StringUtils.join(arrayList4, ","));
                                pushTaskConfig.put("uploadURL", this._clusterInfoAccessor.getVipUrl() + "/segments");
                                pushTaskConfig.put("windowStartMs", String.valueOf(watermarkMs));
                                pushTaskConfig.put("windowEndMs", String.valueOf(j));
                                String str3 = (String) configsForTaskType.get("roundBucketTimePeriod");
                                if (str3 != null) {
                                    pushTaskConfig.put("roundBucketTimePeriod", str3);
                                }
                                String str4 = (String) configsForTaskType.get("mergeType");
                                if (str4 == null) {
                                    str4 = (String) configsForTaskType.get("collectorType");
                                }
                                if (str4 != null) {
                                    pushTaskConfig.put("mergeType", str4);
                                    pushTaskConfig.put("collectorType", str4);
                                }
                                for (Map.Entry entry : configsForTaskType.entrySet()) {
                                    if (((String) entry.getKey()).endsWith(".aggregationType")) {
                                        pushTaskConfig.put((String) entry.getKey(), (String) entry.getValue());
                                    }
                                }
                                String str5 = (String) configsForTaskType.get("maxNumRecordsPerSegment");
                                if (str5 != null) {
                                    pushTaskConfig.put("maxNumRecordsPerSegment", str5);
                                }
                                arrayList.add(new PinotTaskConfig("RealtimeToOfflineSegmentsTask", pushTaskConfig));
                                LOGGER.info("Finished generating task configs for table: {} for task: {}", tableName, "RealtimeToOfflineSegmentsTask");
                            }
                        } else {
                            LOGGER.info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.", new Object[]{hashSet, tableName, "RealtimeToOfflineSegmentsTask"});
                        }
                    }
                } else {
                    LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(), tableName);
                }
            }
        }
        return arrayList;
    }

    private void getCompletedSegmentsInfo(String str, List<SegmentZKMetadata> list, Map<Integer, String> map, Set<Integer> set) {
        List<SegmentZKMetadata> segmentsZKMetadata = this._clusterInfoAccessor.getSegmentsZKMetadata(str);
        HashMap hashMap = new HashMap();
        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
            CommonConstants.Segment.Realtime.Status status = segmentZKMetadata.getStatus();
            if (status.isCompleted()) {
                list.add(segmentZKMetadata);
            }
            LLCSegmentName of = LLCSegmentName.of(segmentZKMetadata.getSegmentName());
            if (of != null) {
                int partitionGroupId = of.getPartitionGroupId();
                set.add(Integer.valueOf(partitionGroupId));
                if (status.isCompleted()) {
                    hashMap.compute(Integer.valueOf(partitionGroupId), (num, lLCSegmentName) -> {
                        return (lLCSegmentName == null || of.getSequenceNumber() > lLCSegmentName.getSequenceNumber()) ? of : lLCSegmentName;
                    });
                }
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            map.put((Integer) entry.getKey(), ((LLCSegmentName) entry.getValue()).getSegmentName());
        }
    }

    private long getWatermarkMs(String str, List<SegmentZKMetadata> list, long j) {
        ZNRecord minionTaskMetadataZNRecord = this._clusterInfoAccessor.getMinionTaskMetadataZNRecord("RealtimeToOfflineSegmentsTask", str);
        RealtimeToOfflineSegmentsTaskMetadata fromZNRecord = minionTaskMetadataZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(minionTaskMetadataZNRecord) : null;
        if (fromZNRecord == null) {
            long j2 = Long.MAX_VALUE;
            Iterator<SegmentZKMetadata> it = list.iterator();
            while (it.hasNext()) {
                j2 = Math.min(j2, it.next().getStartTimeMs());
            }
            Preconditions.checkState(j2 != Long.MAX_VALUE);
            fromZNRecord = new RealtimeToOfflineSegmentsTaskMetadata(str, (j2 / j) * j);
            this._clusterInfoAccessor.setMinionTaskMetadata(fromZNRecord, "RealtimeToOfflineSegmentsTask", -1);
        }
        return fromZNRecord.getWatermarkMs();
    }
}
