package org.apache.pinot.plugin.minion.tasks.upsertcompactmerge;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.DefaultSegmentNumRowProvider;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.class */
public class UpsertCompactMergeTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactMergeTaskExecutor.class);

    public UpsertCompactMergeTaskExecutor(MinionConf minionConf) {
        super(minionConf);
    }

    @Override // org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor
    protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> list, File file) throws Exception {
        this._eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + list.size());
        String taskType = pinotTaskConfig.getTaskType();
        Map configs = pinotTaskConfig.getConfigs();
        LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
        long currentTimeMillis = System.currentTimeMillis();
        String str = (String) configs.get("tableName");
        SegmentProcessorConfig.Builder schema = new SegmentProcessorConfig.Builder().setTableConfig(getTableConfig(str)).setSchema(getSchema(str));
        schema.setProgressObserver(obj -> {
            this._eventObserver.notifyProgress(this._pinotTaskConfig, obj);
        });
        List<SegmentMetadataImpl> list2 = (List) list.stream().map(file2 -> {
            try {
                return new SegmentMetadataImpl(file2);
            } catch (Exception e) {
                throw new RuntimeException(String.format("Error fetching segment-metadata for segmentDir: %s", file2), e);
            }
        }).collect(Collectors.toList());
        int commonPartitionIDForSegments = getCommonPartitionIDForSegments(list2);
        Optional reduce = list2.stream().map((v0) -> {
            return v0.getIndexCreationTime();
        }).reduce((v0, v1) -> {
            return Long.max(v0, v1);
        });
        if (reduce.isEmpty()) {
            LOGGER.error("No valid creation time found for the new merged segment. This might be due to missing creation time for merging segments");
            throw new RuntimeException("No valid creation time found for the new merged segment. This might be due to missing creation time for merging segments");
        }
        validateCRCForInputSegments(list2, List.of((Object[]) ((String) configs.get("crc")).split(",")));
        List list3 = (List) list2.stream().map(segmentMetadataImpl -> {
            RoaringBitmap validDocIdFromServerMatchingCrc = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(str, segmentMetadataImpl.getName(), ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, segmentMetadataImpl.getCrc());
            if (validDocIdFromServerMatchingCrc != null) {
                return new CompactedPinotSegmentRecordReader(segmentMetadataImpl.getIndexDir(), validDocIdFromServerMatchingCrc);
            }
            String format = String.format("No validDocIds found from all servers. They either failed to download or did not match crc from segment copy obtained from deepstore / servers. Expected crc: %s", "");
            LOGGER.error(format);
            throw new IllegalStateException(format);
        }).collect(Collectors.toList());
        schema.setCustomCreationTime((Long) reduce.get());
        schema.setSegmentNameGenerator(new UploadedRealtimeSegmentNameGenerator(TableNameBuilder.extractRawTableName(str), commonPartitionIDForSegments, System.currentTimeMillis(), "compacted", (String) null));
        SegmentProcessorConfig build = schema.build();
        try {
            this._eventObserver.notifyProgress(this._pinotTaskConfig, "Generating segments");
            List<File> process = new SegmentProcessorFramework(build, file, SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(list3), Collections.emptyList(), new DefaultSegmentNumRowProvider(Integer.parseInt((String) configs.get("maxNumRecordsPerSegment")))).process();
            Iterator it = list3.iterator();
            while (it.hasNext()) {
                ((RecordReader) it.next()).close();
            }
            LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", new Object[]{taskType, configs, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            ArrayList arrayList = new ArrayList();
            for (File file3 : process) {
                arrayList.add(new SegmentConversionResult.Builder().setFile(file3).setSegmentName(file3.getName()).setTableNameWithType(str).build());
            }
            return arrayList;
        } catch (Throwable th) {
            Iterator it2 = list3.iterator();
            while (it2.hasNext()) {
                ((RecordReader) it2.next()).close();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor
    public SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, SegmentConversionResult segmentConversionResult) {
        TreeMap treeMap = new TreeMap();
        treeMap.put("UpsertCompactMergeTask.time", String.valueOf(System.currentTimeMillis()));
        treeMap.put("UpsertCompactMergeTask.mergedSegments", (String) pinotTaskConfig.getConfigs().get("segmentName"));
        return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, treeMap);
    }

    int getCommonPartitionIDForSegments(List<SegmentMetadataImpl> list) {
        Set set = (Set) ((List) list.stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList())).stream().map(str -> {
            Integer partitionIdFromRealtimeSegmentName = SegmentUtils.getPartitionIdFromRealtimeSegmentName(str);
            if (partitionIdFromRealtimeSegmentName == null) {
                throw new IllegalStateException(String.format("Partition id not found for %s", str));
            }
            return partitionIdFromRealtimeSegmentName;
        }).collect(Collectors.toSet());
        if (set.size() > 1) {
            throw new IllegalStateException("Found segments with different partition ids during task execution: " + String.valueOf(set));
        }
        return ((Integer) set.iterator().next()).intValue();
    }

    void validateCRCForInputSegments(List<SegmentMetadataImpl> list, List<String> list2) {
        for (int i = 0; i < list.size(); i++) {
            SegmentMetadataImpl segmentMetadataImpl = list.get(i);
            if (!Objects.equals(segmentMetadataImpl.getCrc(), list2.get(i))) {
                String format = String.format("Crc mismatched between ZK and deepstore copy of segment: %s. Expected crc from ZK: %s, crc from deepstore: %s", segmentMetadataImpl.getName(), list2.get(i), segmentMetadataImpl.getCrc());
                LOGGER.error(format);
                throw new IllegalStateException(format);
            }
        }
    }
}
