package org.apache.pinot.server.starter.helix;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
import org.apache.pinot.core.util.SegmentRefreshSemaphore;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.shaded.com.google.common.base.Preconditions;
import org.apache.pinot.shaded.com.google.common.cache.CacheBuilder;
import org.apache.pinot.shaded.com.google.common.cache.CacheLoader;
import org.apache.pinot.shaded.com.google.common.cache.LoadingCache;
import org.apache.pinot.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/server/starter/helix/HelixInstanceDataManager.class */
public class HelixInstanceDataManager implements InstanceDataManager {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) HelixInstanceDataManager.class);
    private HelixInstanceDataManagerConfig _instanceDataManagerConfig;
    private String _instanceId;
    private HelixManager _helixManager;
    private ServerMetrics _serverMetrics;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private SegmentUploader _segmentUploader;
    private long _externalViewDroppedMaxWaitMs;
    private long _externalViewDroppedCheckInternalMs;
    private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
    private ExecutorService _segmentRefreshExecutor;
    private ExecutorService _segmentPreloadExecutor;
    private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>();
    private Supplier<Boolean> _isServerReadyToServeQueries = () -> {
        return false;
    };

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> supplier) {
        this._isServerReadyToServeQueries = supplier;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void init(PinotConfiguration pinotConfiguration, HelixManager helixManager, ServerMetrics serverMetrics) throws ConfigurationException {
        LOGGER.info("Initializing Helix instance data manager");
        this._instanceDataManagerConfig = new HelixInstanceDataManagerConfig(pinotConfiguration);
        LOGGER.info("HelixInstanceDataManagerConfig: {}", this._instanceDataManagerConfig);
        this._instanceId = this._instanceDataManagerConfig.getInstanceId();
        this._helixManager = helixManager;
        this._serverMetrics = serverMetrics;
        this._segmentUploader = new PinotFSSegmentUploader(this._instanceDataManagerConfig.getSegmentStoreUri(), ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), this._serverMetrics);
        this._externalViewDroppedMaxWaitMs = this._instanceDataManagerConfig.getExternalViewDroppedMaxWaitMs();
        this._externalViewDroppedCheckInternalMs = this._instanceDataManagerConfig.getExternalViewDroppedCheckIntervalMs();
        initInstanceDataDir(new File(this._instanceDataManagerConfig.getInstanceDataDir()));
        File file = new File(this._instanceDataManagerConfig.getInstanceSegmentTarDir());
        if (!file.exists()) {
            Preconditions.checkState(file.mkdirs());
        }
        SegmentBuildTimeLeaseExtender.initExecutor();
        int maxParallelRefreshThreads = getMaxParallelRefreshThreads();
        Preconditions.checkArgument(maxParallelRefreshThreads > 0, "SegmentRefreshExecutor requires a positive pool size but got: " + maxParallelRefreshThreads);
        this._segmentRefreshExecutor = Executors.newFixedThreadPool(maxParallelRefreshThreads, new ThreadFactoryBuilder().setNameFormat("segment-refresh-thread-%d").build());
        LOGGER.info("Created SegmentRefreshExecutor with pool size: {}", Integer.valueOf(maxParallelRefreshThreads));
        int maxSegmentPreloadThreads = this._instanceDataManagerConfig.getMaxSegmentPreloadThreads();
        if (maxSegmentPreloadThreads > 0) {
            this._segmentPreloadExecutor = Executors.newFixedThreadPool(maxSegmentPreloadThreads, new ThreadFactoryBuilder().setNameFormat("segment-preload-thread-%d").build());
            LOGGER.info("Created SegmentPreloadExecutor with pool size: {}", Integer.valueOf(maxSegmentPreloadThreads));
        } else {
            LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}", Integer.valueOf(maxSegmentPreloadThreads));
        }
        TableDataManagerProvider.init(this._instanceDataManagerConfig);
        LOGGER.info("Initialized Helix instance data manager");
        this._errorCache = CacheBuilder.newBuilder().maximumSize(this._instanceDataManagerConfig.getErrorCacheSize()).build(new CacheLoader<Pair<String, String>, SegmentErrorInfo>() { // from class: org.apache.pinot.server.starter.helix.HelixInstanceDataManager.1
            @Override // org.apache.pinot.shaded.com.google.common.cache.CacheLoader
            public SegmentErrorInfo load(Pair<String, String> pair) {
                return null;
            }
        });
    }

    private void initInstanceDataDir(File file) {
        if (!file.exists()) {
            Preconditions.checkState(file.mkdirs(), "Failed to create instance data dir: %s", file);
            return;
        }
        File[] listFiles = file.listFiles((file2, str) -> {
            return TableNameBuilder.isTableResource(str);
        });
        if (listFiles != null) {
            for (File file3 : listFiles) {
                File file4 = new File(file3, RealtimeSegmentDataManager.RESOURCE_TEMP_DIR_NAME);
                try {
                    FileUtils.deleteDirectory(file4);
                } catch (IOException e) {
                    LOGGER.error("Failed to delete temporary resource dir: {}, continue with error", file4, e);
                }
                try {
                    if (FileUtils.isEmptyDirectory(file3)) {
                        FileUtils.deleteDirectory(file3);
                    }
                } catch (IOException e2) {
                    LOGGER.error("Failed to delete empty table data dir: {}, continue with error", file3, e2);
                }
            }
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public String getInstanceId() {
        return this._instanceId;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void start() {
        this._propertyStore = this._helixManager.getHelixPropertyStore();
        LOGGER.info("Helix instance data manager started");
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public synchronized void shutDown() {
        this._segmentRefreshExecutor.shutdownNow();
        if (this._segmentPreloadExecutor != null) {
            this._segmentPreloadExecutor.shutdownNow();
        }
        Iterator<TableDataManager> it2 = this._tableDataManagerMap.values().iterator();
        while (it2.hasNext()) {
            it2.next().shutDown();
        }
        SegmentBuildTimeLeaseExtender.shutdownExecutor();
        LOGGER.info("Helix instance data manager shut down");
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void addRealtimeSegment(String str, String str2) throws Exception {
        LOGGER.info("Adding segment: {} to table: {}", str2, str);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", str);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, tableConfig);
        Preconditions.checkState(tableSchema != null, "Failed to find schema for table: %s", str);
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, str, str2);
        Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", str2, str);
        this._tableDataManagerMap.computeIfAbsent(str, str3 -> {
            return createTableDataManager(str3, tableConfig);
        }).addSegment(str2, new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig, tableSchema), segmentZKMetadata);
        LOGGER.info("Added segment: {} to table: {}", str2, str);
    }

    private TableDataManager createTableDataManager(String str, TableConfig tableConfig) {
        LOGGER.info("Creating table data manager for table: {}", str);
        TableDataManager tableDataManager = TableDataManagerProvider.getTableDataManager(new TableDataManagerConfig(this._instanceDataManagerConfig, tableConfig), this._instanceId, this._propertyStore, this._serverMetrics, this._helixManager, this._segmentPreloadExecutor, this._errorCache, this._isServerReadyToServeQueries);
        tableDataManager.start();
        LOGGER.info("Created table data manager for table: {}", str);
        return tableDataManager;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void deleteTable(String str) throws Exception {
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find table data manager for table: {}, skip deleting the table", str);
            return;
        }
        LOGGER.info("Shutting down table data manager for table: {}", str);
        tableDataManager.shutDown();
        LOGGER.info("Finished shutting down table data manager for table: {}", str);
        try {
            HelixDataAccessor helixDataAccessor = this._helixManager.getHelixDataAccessor();
            PropertyKey externalView = helixDataAccessor.keyBuilder().externalView(str);
            long currentTimeMillis = System.currentTimeMillis() + this._externalViewDroppedMaxWaitMs;
            do {
                ExternalView externalView2 = (ExternalView) helixDataAccessor.getProperty(externalView);
                if (externalView2 == null) {
                    LOGGER.info("ExternalView is dropped for table: {}", str);
                    this._tableDataManagerMap.remove(str);
                    return;
                } else {
                    if (externalView2.getRecord().getMapFields().isEmpty()) {
                        LOGGER.info("ExternalView is empty for table: {}", str);
                        this._tableDataManagerMap.remove(str);
                        return;
                    }
                    Thread.sleep(this._externalViewDroppedCheckInternalMs);
                }
            } while (System.currentTimeMillis() < currentTimeMillis);
            LOGGER.warn("ExternalView still exists after {}ms for table: {}", Long.valueOf(this._externalViewDroppedMaxWaitMs), str);
            this._tableDataManagerMap.remove(str);
        } catch (Throwable th) {
            this._tableDataManagerMap.remove(str);
            throw th;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void offloadSegment(String str, String str2) {
        LOGGER.info("Removing segment: {} from table: {}", str2, str);
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find data manager for table: {}, skipping removing segment: {}", str, str2);
        } else {
            tableDataManager.removeSegment(str2);
            LOGGER.info("Removed segment: {} from table: {}", str2, str);
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void deleteSegment(String str, String str2) throws Exception {
        Lock segmentLock = SegmentLocks.getSegmentLock(str, str2);
        try {
            segmentLock.lock();
            File segmentDataDirectory = getSegmentDataDirectory(str, str2);
            if (segmentDataDirectory.exists()) {
                FileUtils.deleteQuietly(segmentDataDirectory);
                LOGGER.info("Deleted segment directory {} on default tier", segmentDataDirectory);
            }
            SegmentDirectoryLoader segmentDirectoryLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(this._instanceDataManagerConfig.getSegmentDirectoryLoader());
            if (segmentDirectoryLoader != null) {
                LOGGER.info("Deleting segment: {} further with segment loader: {}", str2, this._instanceDataManagerConfig.getSegmentDirectoryLoader());
                segmentDirectoryLoader.delete(new SegmentDirectoryLoaderContext.Builder().setSegmentName(str2).setTableDataDir(this._instanceDataManagerConfig.getInstanceDataDir() + "/" + str).build());
            }
        } finally {
            segmentLock.unlock();
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void reloadSegment(String str, String str2, boolean z) throws Exception {
        LOGGER.info("Reloading single segment: {} in table: {}", str2, str);
        SegmentMetadata segmentMetadata = getSegmentMetadata(str, str2);
        if (segmentMetadata == null) {
            LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", str2, str);
            return;
        }
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        reloadSegmentWithMetadata(str, segmentMetadata, tableConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, str), z);
        LOGGER.info("Reloaded single segment: {} in table: {}", str2, str);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void reloadAllSegments(String str, boolean z, SegmentRefreshSemaphore segmentRefreshSemaphore) throws Exception {
        LOGGER.info("Reloading all segments in table: {}", str);
        reloadSegmentsWithMetadata(str, getAllSegmentsMetadata(str), z, segmentRefreshSemaphore);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void reloadSegments(String str, List<String> list, boolean z, SegmentRefreshSemaphore segmentRefreshSemaphore) throws Exception {
        LOGGER.info("Reloading multiple segments: {} in table: {}", list, str);
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find table data manager for table: {}, skipping reloading segments: {}", str, list);
            return;
        }
        ArrayList arrayList = new ArrayList();
        List<SegmentDataManager> acquireSegments = tableDataManager.acquireSegments(list, arrayList);
        if (!arrayList.isEmpty()) {
            LOGGER.warn("Failed to get segment data manager for segments: {} of table: {}, skipping reloading them", arrayList, tableDataManager);
        }
        ArrayList arrayList2 = new ArrayList(acquireSegments.size());
        try {
            Iterator<SegmentDataManager> it2 = acquireSegments.iterator();
            while (it2.hasNext()) {
                arrayList2.add(it2.next().getSegment().getSegmentMetadata());
            }
            reloadSegmentsWithMetadata(str, arrayList2, z, segmentRefreshSemaphore);
        } finally {
            Iterator<SegmentDataManager> it3 = acquireSegments.iterator();
            while (it3.hasNext()) {
                tableDataManager.releaseSegment(it3.next());
            }
        }
    }

    private void reloadSegmentsWithMetadata(String str, List<SegmentMetadata> list, boolean z, SegmentRefreshSemaphore segmentRefreshSemaphore) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkNotNull(tableConfig);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, str);
        ArrayList arrayList = new ArrayList();
        AtomicReference atomicReference = new AtomicReference();
        CompletableFuture.allOf((CompletableFuture[]) list.stream().map(segmentMetadata -> {
            return CompletableFuture.runAsync(() -> {
                String name = segmentMetadata.getName();
                try {
                    segmentRefreshSemaphore.acquireSema(segmentMetadata.getName(), LOGGER);
                    try {
                        reloadSegmentWithMetadata(str, segmentMetadata, tableConfig, tableSchema, z);
                        segmentRefreshSemaphore.releaseSema();
                    } catch (Throwable th) {
                        segmentRefreshSemaphore.releaseSema();
                        throw th;
                    }
                } catch (Exception e) {
                    LOGGER.error("Caught exception while reloading segment: {} in table: {}", name, str, e);
                    arrayList.add(name);
                    atomicReference.set(e);
                }
            }, this._segmentRefreshExecutor);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).get();
        if (atomicReference.get() != null) {
            throw new RuntimeException(String.format("Failed to reload %d/%d segments: %s in table: %s", Integer.valueOf(arrayList.size()), Integer.valueOf(list.size()), arrayList, str), (Throwable) atomicReference.get());
        }
        LOGGER.info("Reloaded segments with metadata in table: {}. Duration: {}", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void reloadSegmentWithMetadata(String str, SegmentMetadata segmentMetadata, TableConfig tableConfig, @Nullable Schema schema, boolean z) throws Exception {
        String name = segmentMetadata.getName();
        LOGGER.info("Reloading segment: {} in table: {} with forceDownload: {}", name, str, Boolean.valueOf(z));
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            LOGGER.warn("Failed to find table data manager for table: {}, skipping reloading segment", str);
            return;
        }
        if (!segmentMetadata.isMutableSegment()) {
            SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, str, name);
            Preconditions.checkNotNull(segmentZKMetadata);
            Lock segmentLock = SegmentLocks.getSegmentLock(str, name);
            try {
                segmentLock.lock();
                IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig, schema);
                indexLoadingConfig.setErrorOnColumnBuildFailure(true);
                tableDataManager.reloadSegment(name, indexLoadingConfig, segmentZKMetadata, segmentMetadata, schema, z);
                LOGGER.info("Reloaded segment: {} of table: {}", name, str);
                segmentLock.unlock();
                return;
            } catch (Throwable th) {
                segmentLock.unlock();
                throw th;
            }
        }
        SegmentDataManager acquireSegment = tableDataManager.acquireSegment(name);
        if (acquireSegment == null) {
            LOGGER.warn("Failed to find segment data manager for table: {}, segment: {}, skipping reloading segment", str, name);
            return;
        }
        try {
            if (!this._instanceDataManagerConfig.shouldReloadConsumingSegment()) {
                LOGGER.warn("Skip reloading consuming segment: {} in table: {} as configured", name, str);
                tableDataManager.releaseSegment(acquireSegment);
            } else if (!(acquireSegment instanceof LLRealtimeSegmentDataManager)) {
                LOGGER.warn("Cannot reload non-LLC consuming segment: {} in table: {}", name, str);
                tableDataManager.releaseSegment(acquireSegment);
            } else {
                LOGGER.info("Reloading (force committing) LLC consuming segment: {} in table: {}", name, str);
                ((LLRealtimeSegmentDataManager) acquireSegment).forceCommit();
                tableDataManager.releaseSegment(acquireSegment);
            }
        } catch (Throwable th2) {
            tableDataManager.releaseSegment(acquireSegment);
            throw th2;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void addOrReplaceSegment(String str, String str2) throws Exception {
        LOGGER.info("Adding or replacing segment: {} for table: {}", str2, str);
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, str);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", str);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, tableConfig);
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, str, str2);
        Preconditions.checkState(segmentZKMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", str2, str);
        Lock segmentLock = SegmentLocks.getSegmentLock(str, str2);
        try {
            segmentLock.lock();
            this._tableDataManagerMap.computeIfAbsent(str, str3 -> {
                return createTableDataManager(str3, tableConfig);
            }).addOrReplaceSegment(str2, new IndexLoadingConfig(this._instanceDataManagerConfig, tableConfig, tableSchema), segmentZKMetadata, getSegmentMetadata(str, str2));
            LOGGER.info("Added or replaced segment: {} of table: {}", str2, str);
            segmentLock.unlock();
        } catch (Throwable th) {
            segmentLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public Set<String> getAllTables() {
        return this._tableDataManagerMap.keySet();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    @Nullable
    public TableDataManager getTableDataManager(String str) {
        return this._tableDataManagerMap.get(str);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    @Nullable
    public SegmentMetadata getSegmentMetadata(String str, String str2) {
        SegmentDataManager acquireSegment;
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null || (acquireSegment = tableDataManager.acquireSegment(str2)) == null) {
            return null;
        }
        try {
            SegmentMetadata segmentMetadata = acquireSegment.getSegment().getSegmentMetadata();
            tableDataManager.releaseSegment(acquireSegment);
            return segmentMetadata;
        } catch (Throwable th) {
            tableDataManager.releaseSegment(acquireSegment);
            throw th;
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public List<SegmentMetadata> getAllSegmentsMetadata(String str) {
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager == null) {
            return Collections.emptyList();
        }
        List<SegmentDataManager> acquireAllSegments = tableDataManager.acquireAllSegments();
        try {
            ArrayList arrayList = new ArrayList(acquireAllSegments.size());
            Iterator<SegmentDataManager> it2 = acquireAllSegments.iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getSegment().getSegmentMetadata());
            }
            return arrayList;
        } finally {
            Iterator<SegmentDataManager> it3 = acquireAllSegments.iterator();
            while (it3.hasNext()) {
                tableDataManager.releaseSegment(it3.next());
            }
        }
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public File getSegmentDataDirectory(String str, String str2) {
        return new File(new File(this._instanceDataManagerConfig.getInstanceDataDir(), str), str2);
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public String getSegmentFileDirectory() {
        return this._instanceDataManagerConfig.getInstanceSegmentTarDir();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public int getMaxParallelRefreshThreads() {
        return this._instanceDataManagerConfig.getMaxParallelRefreshThreads();
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
        return this._propertyStore;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public SegmentUploader getSegmentUploader() {
        return this._segmentUploader;
    }

    @Override // org.apache.pinot.core.data.manager.InstanceDataManager
    public void forceCommit(String str, Set<String> set) {
        Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(str), String.format("Force commit is only supported for segments of realtime tables - table name: %s segment names: %s", str, set));
        TableDataManager tableDataManager = this._tableDataManagerMap.get(str);
        if (tableDataManager != null) {
            set.forEach(str2 -> {
                SegmentDataManager acquireSegment = tableDataManager.acquireSegment(str2);
                if (acquireSegment != null) {
                    try {
                        if (acquireSegment instanceof LLRealtimeSegmentDataManager) {
                            ((LLRealtimeSegmentDataManager) acquireSegment).forceCommit();
                        }
                    } finally {
                        tableDataManager.releaseSegment(acquireSegment);
                    }
                }
            });
        }
    }
}
