package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.com.google.common.collect.ImmutableList;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TaskGenerator
/* loaded from: input_file:org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.class */
public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SegmentGenerationAndPushTaskGenerator.class);
    private static final BatchConfigProperties.SegmentPushType DEFAULT_SEGMENT_PUSH_TYPE = BatchConfigProperties.SegmentPushType.TAR;

    @Override // org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator
    public String getTaskType() {
        return MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
    }

    @Override // org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator
    public List<PinotTaskConfig> generateTasks(List<TableConfig> list) {
        int i;
        ArrayList arrayList = new ArrayList();
        for (TableConfig tableConfig : list) {
            String tableName = tableConfig.getTableName();
            TableTaskConfig taskConfig = tableConfig.getTaskConfig();
            Preconditions.checkNotNull(taskConfig);
            Map<String, String> configsForTaskType = taskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
            Preconditions.checkNotNull(configsForTaskType, "Task config shouldn't be null for Table: %s", tableName);
            String str = configsForTaskType.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
            if (str != null) {
                try {
                    i = Integer.parseInt(str);
                } catch (NumberFormatException e) {
                    i = Integer.MAX_VALUE;
                }
            } else {
                i = Integer.MAX_VALUE;
            }
            int i2 = 0;
            if (0 == i) {
                break;
            }
            String batchSegmentIngestionType = IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig);
            for (Map<String, String> map : tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()) {
                try {
                    URI directoryURI = SegmentGenerationUtils.getDirectoryURI(map.get(BatchConfigProperties.INPUT_DIR_URI));
                    updateRecordReaderConfigs(map);
                    List<SegmentZKMetadata> emptyList = Collections.emptyList();
                    if (BatchConfigProperties.SegmentIngestionType.APPEND.name().equalsIgnoreCase(batchSegmentIngestionType)) {
                        emptyList = getSegmentsZKMetadataForTable(tableName);
                    }
                    Set<String> existingSegmentInputFiles = getExistingSegmentInputFiles(emptyList);
                    Set<String> inputFilesFromRunningTasks = getInputFilesFromRunningTasks(tableName);
                    existingSegmentInputFiles.addAll(inputFilesFromRunningTasks);
                    LOGGER.info("Trying to extract input files from path: {}, and exclude input files from existing segments metadata: {}, and input files from running tasks: {}", directoryURI, existingSegmentInputFiles, inputFilesFromRunningTasks);
                    List<URI> inputFilesFromDirectory = getInputFilesFromDirectory(map, directoryURI, existingSegmentInputFiles);
                    if (inputFilesFromDirectory.size() < 10) {
                        LOGGER.info("Final input files for task config generation: {}", inputFilesFromDirectory);
                    } else {
                        LOGGER.info("Final input files for task config generation: {}...", inputFilesFromDirectory.subList(0, 10));
                    }
                    Iterator<URI> it2 = inputFilesFromDirectory.iterator();
                    while (it2.hasNext()) {
                        arrayList.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, getSingleFileGenerationTaskConfig(tableName, i2, map, it2.next(), null)));
                        i2++;
                        if (i2 == i) {
                            break;
                        }
                    }
                } catch (Exception e2) {
                    LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]", tableConfig, configsForTaskType, e2);
                }
            }
        }
        return arrayList;
    }

    @Override // org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator, org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator
    public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> map) throws Exception {
        String uuid = UUID.randomUUID().toString();
        String tableName = tableConfig.getTableName();
        HashMap hashMap = new HashMap();
        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
        if (taskConfig != null) {
            hashMap.putAll(taskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
        }
        hashMap.putAll(map);
        int i = 0;
        try {
            URI directoryURI = SegmentGenerationUtils.getDirectoryURI(hashMap.get(BatchConfigProperties.INPUT_DIR_URI));
            List<URI> inputFilesFromDirectory = getInputFilesFromDirectory(hashMap, directoryURI, Collections.emptySet());
            if (inputFilesFromDirectory.isEmpty()) {
                LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", directoryURI);
                return ImmutableList.of();
            }
            if (!hashMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
                hashMap.put(BatchConfigProperties.INPUT_FORMAT, extractFormatFromFileSuffix(inputFilesFromDirectory.get(0).getPath()));
            }
            updateRecordReaderConfigs(hashMap);
            ArrayList arrayList = new ArrayList();
            LOGGER.info("Final input files for task config generation: {}", inputFilesFromDirectory);
            Iterator<URI> it2 = inputFilesFromDirectory.iterator();
            while (it2.hasNext()) {
                arrayList.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, getSingleFileGenerationTaskConfig(tableName, i, hashMap, it2.next(), generateFixedSegmentName(tableName, uuid, i))));
                i++;
            }
            if (!hashMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
                hashMap.put(BatchConfigProperties.INPUT_FORMAT, extractFormatFromFileSuffix(inputFilesFromDirectory.get(0).getPath()));
                updateRecordReaderConfigs(hashMap);
            }
            return arrayList;
        } catch (Exception e) {
            LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]", tableConfig, map, e);
            throw e;
        }
    }

    private String generateFixedSegmentName(String str, String str2, int i) {
        return String.format("%s_%s_%d", str, str2, Integer.valueOf(i));
    }

    private String extractFormatFromFileSuffix(String str) {
        int lastIndexOf = str.lastIndexOf(".");
        if (lastIndexOf < 0) {
            throw new UnsupportedOperationException("No file extension found");
        }
        return str.substring(lastIndexOf + 1);
    }

    private Set<String> getInputFilesFromRunningTasks(String str) {
        HashSet hashSet = new HashSet();
        TaskGeneratorUtils.forRunningTasks(str, MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, this._clusterInfoAccessor, map -> {
            String str2 = (String) map.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY);
            if (str2 != null) {
                hashSet.add(str2);
            }
        });
        return hashSet;
    }

    private Map<String, String> getSingleFileGenerationTaskConfig(String str, int i, Map<String, String> map, URI uri, @Nullable String str2) throws URISyntaxException {
        URI directoryURI = SegmentGenerationUtils.getDirectoryURI(map.get(BatchConfigProperties.INPUT_DIR_URI));
        URI uri2 = null;
        if (map.containsKey(BatchConfigProperties.OUTPUT_DIR_URI)) {
            uri2 = SegmentGenerationUtils.getDirectoryURI(map.get(BatchConfigProperties.OUTPUT_DIR_URI));
        }
        String pushMode = IngestionConfigUtils.getPushMode(map);
        HashMap hashMap = new HashMap(map);
        hashMap.put("tableName", str);
        hashMap.put(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, uri.toString());
        if (uri2 != null) {
            hashMap.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, SegmentGenerationUtils.getRelativeOutputPath(directoryURI, uri, uri2).toString());
        }
        hashMap.put("sequenceId", String.valueOf(i));
        if (!hashMap.containsKey(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE)) {
            if (str2 == null) {
                hashMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, "simple");
            } else {
                hashMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, "fixed");
                hashMap.put("segmentNameGenerator.configs.segment.name", str2);
            }
        }
        hashMap.put("append.uuid.to.segment.name", Boolean.toString(true));
        if (uri2 == null || pushMode == null) {
            hashMap.put(BatchConfigProperties.PUSH_MODE, DEFAULT_SEGMENT_PUSH_TYPE.toString());
        } else {
            hashMap.put(BatchConfigProperties.PUSH_MODE, pushMode);
        }
        hashMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, this._clusterInfoAccessor.getVipUrl());
        return hashMap;
    }

    private void updateRecordReaderConfigs(Map<String, String> map) {
        String str = map.get(BatchConfigProperties.INPUT_FORMAT);
        String recordReaderClassName = PluginManager.get().getRecordReaderClassName(str);
        if (recordReaderClassName != null) {
            map.putIfAbsent(BatchConfigProperties.RECORD_READER_CLASS, recordReaderClassName);
        }
        String recordReaderConfigClassName = PluginManager.get().getRecordReaderConfigClassName(str);
        if (recordReaderConfigClassName != null) {
            map.putIfAbsent(BatchConfigProperties.RECORD_READER_CONFIG_CLASS, recordReaderConfigClassName);
        }
    }

    private List<URI> getInputFilesFromDirectory(Map<String, String> map, URI uri, Set<String> set) throws Exception {
        PinotFS inputPinotFS = MinionTaskUtils.getInputPinotFS(map, uri);
        try {
            String str = map.get(BatchConfigProperties.INCLUDE_FILE_NAME_PATTERN);
            String str2 = map.get(BatchConfigProperties.EXCLUDE_FILE_NAME_PATTERN);
            try {
                String[] listFiles = inputPinotFS.listFiles(uri, true);
                PathMatcher pathMatcher = str != null ? FileSystems.getDefault().getPathMatcher(str) : null;
                PathMatcher pathMatcher2 = str2 != null ? FileSystems.getDefault().getPathMatcher(str2) : null;
                ArrayList arrayList = new ArrayList();
                for (String str3 : listFiles) {
                    LOGGER.debug("Processing file: {}", str3);
                    if (pathMatcher != null && !pathMatcher.matches(Paths.get(str3, new String[0]))) {
                        LOGGER.debug("Exclude file {} as it's not matching includeFilePathMatcher: {}", str3, str);
                    } else if (pathMatcher2 == null || !pathMatcher2.matches(Paths.get(str3, new String[0]))) {
                        try {
                            URI fileURI = SegmentGenerationUtils.getFileURI(str3, uri);
                            if (set.contains(fileURI.toString())) {
                                LOGGER.debug("Skipping already processed inputFileURI: {}", fileURI);
                            } else if (inputPinotFS.isDirectory(fileURI)) {
                                LOGGER.debug("Skipping directory: {}", fileURI);
                            } else {
                                arrayList.add(fileURI);
                            }
                        } catch (Exception e) {
                            LOGGER.error("Failed to construct inputFileURI for path: {}, parent directory URI: {}", str3, uri, e);
                        }
                    } else {
                        LOGGER.debug("Exclude file {} as it's matching excludeFilePathMatcher: {}", str3, str2);
                    }
                }
                if (inputPinotFS != null) {
                    inputPinotFS.close();
                }
                return arrayList;
            } catch (IOException e2) {
                LOGGER.error("Unable to list files under URI: " + uri, (Throwable) e2);
                List<URI> emptyList = Collections.emptyList();
                if (inputPinotFS != null) {
                    inputPinotFS.close();
                }
                return emptyList;
            }
        } catch (Throwable th) {
            if (inputPinotFS != null) {
                try {
                    inputPinotFS.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Set<String> getExistingSegmentInputFiles(List<SegmentZKMetadata> list) {
        HashSet hashSet = new HashSet();
        Iterator<SegmentZKMetadata> it2 = list.iterator();
        while (it2.hasNext()) {
            Map<String, String> customMap = it2.next().getCustomMap();
            if (customMap != null && customMap.containsKey(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY)) {
                hashSet.add(customMap.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
            }
        }
        return hashSet;
    }
}
