package org.apache.pinot.core.data.manager.realtime;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.$internal.com.google.common.base.Preconditions;
import org.apache.pinot.$internal.org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.$internal.org.apache.commons.io.FileUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.class */
public class RealtimeTableDataManager extends BaseTableDataManager {
    private SegmentBuildTimeLeaseExtender _leaseExtender;
    private RealtimeSegmentStatsHistory _statsHistory;
    private final Semaphore _segmentBuildSemaphore;
    private static final String STATS_FILE_NAME = "segment-stats.ser";
    private static final String CONSUMERS_DIR = "consumers";
    private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
    private TableDedupMetadataManager _tableDedupMetadataManager;
    private TableUpsertMetadataManager _tableUpsertMetadataManager;
    private final ExecutorService _segmentAsyncExecutorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("SegmentAsyncExecutorService"));
    private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap();
    private final AtomicBoolean _allSegmentsLoaded = new AtomicBoolean();

    public RealtimeTableDataManager(Semaphore semaphore) {
        this._segmentBuildSemaphore = semaphore;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doInit() {
        this._leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(this._instanceId, this._serverMetrics, this._tableNameWithType);
        File file = new File(this._tableDataDir, STATS_FILE_NAME);
        try {
            this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
        } catch (IOException | ClassNotFoundException e) {
            this._logger.error("Error reading history object for table {} from {}", new Object[]{this._tableNameWithType, file.getAbsolutePath(), e});
            File file2 = new File(this._tableDataDir, "segment-stats.ser." + UUID.randomUUID());
            try {
                FileUtils.moveFile(file, file2);
                this._logger.warn("Saved unreadable {} into {}. Creating a fresh instance", file.getAbsolutePath(), file2.getAbsolutePath());
                try {
                    this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
                } catch (Exception e2) {
                    Utils.rethrowException(e2);
                }
            } catch (IOException e3) {
                this._logger.error("Could not move {} to {}", new Object[]{file.getAbsolutePath(), file2.getAbsolutePath(), e3});
                throw new RuntimeException(e);
            }
        }
        this._statsHistory.setMinIntervalBetweenUpdatesMillis(TimeUnit.MILLISECONDS.convert(30L, TimeUnit.MINUTES));
        String consumerDir = getConsumerDir();
        File file3 = new File(consumerDir);
        if (file3.exists()) {
            File[] listFiles = file3.listFiles((file4, str) -> {
                return !str.equals(STATS_FILE_NAME);
            });
            Preconditions.checkState(listFiles != null, "Failed to list segment files from consumer dir: {} for table: {}", consumerDir, this._tableNameWithType);
            for (File file5 : listFiles) {
                if (file5.delete()) {
                    this._logger.info("Deleted old file {}", file5.getAbsolutePath());
                } else {
                    this._logger.error("Cannot delete file {}", file5.getAbsolutePath());
                }
            }
        }
        TableConfig tableConfig = ZKMetadataProvider.getTableConfig(this._propertyStore, this._tableNameWithType);
        Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", this._tableNameWithType);
        DedupConfig dedupConfig = tableConfig.getDedupConfig();
        boolean z = dedupConfig != null && dedupConfig.isDedupEnabled();
        if (z) {
            Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
            Preconditions.checkState(tableSchema != null, "Failed to find schema for table: %s", this._tableNameWithType);
            List<String> primaryKeyColumns = tableSchema.getPrimaryKeyColumns();
            Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for dedup");
            this._tableDedupMetadataManager = new TableDedupMetadataManager(this._tableNameWithType, primaryKeyColumns, this._serverMetrics, dedupConfig.getHashFunction());
        }
        UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
        if (upsertConfig == null || upsertConfig.getMode() == UpsertConfig.Mode.NONE) {
            return;
        }
        Preconditions.checkState(!z, "Dedup and upsert cannot be both enabled for table: %s", this._tableUpsertMetadataManager);
        Schema tableSchema2 = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
        Preconditions.checkState(tableSchema2 != null, "Failed to find schema for table: %s", this._tableNameWithType);
        this._tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, tableSchema2, this, this._serverMetrics);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doStart() {
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doShutdown() {
        this._segmentAsyncExecutorService.shutdown();
        if (this._tableUpsertMetadataManager != null) {
            try {
                this._tableUpsertMetadataManager.close();
            } catch (IOException e) {
                this._logger.warn("Cannot close upsert metadata manager properly for table: {}", this._tableNameWithType, e);
            }
        }
        Iterator<SegmentDataManager> it = this._segmentDataManagerMap.values().iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        if (this._leaseExtender != null) {
            this._leaseExtender.shutDown();
        }
    }

    public RealtimeSegmentStatsHistory getStatsHistory() {
        return this._statsHistory;
    }

    public Semaphore getSegmentBuildSemaphore() {
        return this._segmentBuildSemaphore;
    }

    public String getConsumerDir() {
        String consumerDir = this._tableDataManagerConfig.getConsumerDir();
        File file = consumerDir != null ? new File(consumerDir, this._tableNameWithType) : new File(this._tableDataDir + File.separator + CONSUMERS_DIR);
        if (!file.exists() && !file.mkdirs()) {
            this._logger.error("Failed to create consumer directory {}", file.getAbsolutePath());
        }
        return file.getAbsolutePath();
    }

    public boolean isDedupEnabled() {
        return this._tableDedupMetadataManager != null;
    }

    public boolean isUpsertEnabled() {
        return this._tableUpsertMetadataManager != null;
    }

    public boolean isPartialUpsertEnabled() {
        return this._tableUpsertMetadataManager != null && this._tableUpsertMetadataManager.getUpsertMode() == UpsertConfig.Mode.PARTIAL;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager, org.apache.pinot.segment.local.data.manager.TableDataManager
    public void addSegment(String str, TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig) throws Exception {
        RealtimeSegmentDataManager hLRealtimeSegmentDataManager;
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(str);
        if (segmentDataManager != null) {
            this._logger.warn("Skipping adding existing segment: {} for table: {} with data manager class: {}", new Object[]{str, this._tableNameWithType, segmentDataManager.getClass().getSimpleName()});
            return;
        }
        SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider.getSegmentZKMetadata(this._propertyStore, this._tableNameWithType, str);
        Preconditions.checkNotNull(segmentZKMetadata);
        Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
        Preconditions.checkNotNull(tableSchema);
        File file = new File(this._indexDir, str);
        LoaderUtils.reloadFailureRecovery(file);
        boolean isHighLevelConsumerSegmentName = SegmentName.isHighLevelConsumerSegmentName(str);
        if (segmentZKMetadata.getStatus().isCompleted()) {
            if (file.exists()) {
                try {
                    addSegment(ImmutableSegmentLoader.load(file, indexLoadingConfig, tableSchema));
                    return;
                } catch (Exception e) {
                    if (isHighLevelConsumerSegmentName) {
                        throw new RuntimeException("Failed to load local HLC segment: " + str, e);
                    }
                    this._logger.error("Caught exception while loading segment: {}, downloading a new copy", str, e);
                    FileUtils.deleteQuietly(file);
                }
            } else if (isHighLevelConsumerSegmentName) {
                throw new RuntimeException("Failed to find local copy for committed HLC segment: " + str);
            }
            downloadAndReplaceSegment(str, segmentZKMetadata, indexLoadingConfig, tableConfig);
            return;
        }
        FileUtils.deleteQuietly(file);
        if (!isValid(tableSchema, tableConfig.getIndexingConfig())) {
            this._logger.error("Not adding segment {}", str);
            throw new RuntimeException("Mismatching schema/table config for " + this._tableNameWithType);
        }
        VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(tableSchema, str);
        if (isHighLevelConsumerSegmentName) {
            hLRealtimeSegmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, ZKMetadataProvider.getInstanceZKMetadata(this._propertyStore, this._instanceId), this, this._indexDir.getAbsolutePath(), indexLoadingConfig, tableSchema, this._serverMetrics);
        } else {
            LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
            int partitionGroupId = lLCSegmentName.getPartitionGroupId();
            Semaphore computeIfAbsent = this._partitionGroupIdToSemaphoreMap.computeIfAbsent(Integer.valueOf(partitionGroupId), num -> {
                return new Semaphore(1);
            });
            PartitionUpsertMetadataManager orCreatePartitionManager = this._tableUpsertMetadataManager != null ? this._tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null;
            PartitionDedupMetadataManager orCreatePartitionManager2 = this._tableDedupMetadataManager != null ? this._tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null;
            if ((isDedupEnabled() || isPartialUpsertEnabled()) && !this._allSegmentsLoaded.get()) {
                synchronized (this._allSegmentsLoaded) {
                    if (!this._allSegmentsLoaded.get()) {
                        TableStateUtils.waitForAllSegmentsLoaded(this._helixManager, this._tableNameWithType);
                        this._allSegmentsLoaded.set(true);
                    }
                }
            }
            hLRealtimeSegmentDataManager = new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, this._indexDir.getAbsolutePath(), indexLoadingConfig, SegmentGeneratorConfig.updateSchemaWithTimestampIndexes(tableSchema, SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(tableConfig)), lLCSegmentName, computeIfAbsent, this._serverMetrics, orCreatePartitionManager, orCreatePartitionManager2);
        }
        this._logger.info("Initialized RealtimeSegmentDataManager - " + str);
        this._segmentDataManagerMap.put(str, hLRealtimeSegmentDataManager);
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager, org.apache.pinot.segment.local.data.manager.TableDataManager
    public void addSegment(ImmutableSegment immutableSegment) {
        if (isUpsertEnabled()) {
            handleUpsert(immutableSegment);
            return;
        }
        if (isDedupEnabled()) {
            buildDedupMeta((ImmutableSegmentImpl) immutableSegment);
        }
        super.addSegment(immutableSegment);
    }

    private void buildDedupMeta(ImmutableSegmentImpl immutableSegmentImpl) {
        String segmentName = immutableSegmentImpl.getSegmentName();
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, this._tableNameWithType, this._helixManager, null);
        Preconditions.checkNotNull(realtimeSegmentPartitionId, String.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, this._tableNameWithType));
        PartitionDedupMetadataManager orCreatePartitionManager = this._tableDedupMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue());
        immutableSegmentImpl.enableDedup(orCreatePartitionManager);
        orCreatePartitionManager.addSegment(immutableSegmentImpl);
    }

    private void handleUpsert(ImmutableSegment immutableSegment) {
        String segmentName = immutableSegment.getSegmentName();
        this._logger.info("Adding immutable segment: {} to upsert-enabled table: {}", segmentName, this._tableNameWithType);
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, this._tableNameWithType, this._helixManager, null);
        Preconditions.checkNotNull(realtimeSegmentPartitionId, String.format("Failed to get partition id for segment: %s (upsert-enabled table: %s)", segmentName, this._tableNameWithType));
        PartitionUpsertMetadataManager orCreatePartitionManager = this._tableUpsertMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue());
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.DOCUMENT_COUNT, immutableSegment.getSegmentMetadata().getTotalDocs());
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
        SegmentDataManager put = this._segmentDataManagerMap.put(segmentName, new ImmutableSegmentDataManager(immutableSegment));
        if (put == null) {
            orCreatePartitionManager.addSegment(immutableSegment);
            this._logger.info("Added new immutable segment: {} to upsert-enabled table: {}", segmentName, this._tableNameWithType);
            return;
        }
        IndexSegment segment = put.getSegment();
        orCreatePartitionManager.replaceSegment(immutableSegment, segment);
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = segment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = segmentName;
        objArr[2] = this._tableNameWithType;
        logger.info("Replaced {} segment: {} of upsert-enabled table: {}", objArr);
        releaseSegment(put);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected boolean allowDownload(String str, SegmentZKMetadata segmentZKMetadata) {
        return (SegmentName.isHighLevelConsumerSegmentName(str) || segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS || "".equals(segmentZKMetadata.getDownloadUrl())) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void downloadAndReplaceSegment(String str, SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, TableConfig tableConfig) {
        String downloadUrl = segmentZKMetadata.getDownloadUrl();
        if ("".equals(downloadUrl)) {
            if (!isPeerSegmentDownloadEnabled(tableConfig)) {
                throw new RuntimeException("Peer segment download not enabled for segment " + str);
            }
            downloadSegmentFromPeer(str, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
            return;
        }
        try {
            downloadSegmentFromDeepStore(str, indexLoadingConfig, downloadUrl);
        } catch (Exception e) {
            this._logger.warn("Download segment {} from deepstore uri {} failed.", new Object[]{str, downloadUrl, e});
            if (!isPeerSegmentDownloadEnabled(tableConfig)) {
                throw e;
            }
            downloadSegmentFromPeer(str, tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), indexLoadingConfig);
        }
    }

    private void downloadSegmentFromDeepStore(String str, IndexLoadingConfig indexLoadingConfig, String str2) {
        File file = new File(this._indexDir, str + ".tar.gz");
        try {
            try {
                SegmentFetcherFactory.fetchSegmentToLocal(str2, file);
                this._logger.info("Downloaded file from {} to {}; Length of downloaded file: {}", new Object[]{str2, file, Long.valueOf(file.length())});
                untarAndMoveSegment(str, indexLoadingConfig, file);
                FileUtils.deleteQuietly(file);
            } catch (Exception e) {
                this._logger.warn("Failed to download segment {} from deep store: ", str, e);
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    private void untarAndMoveSegment(String str, IndexLoadingConfig indexLoadingConfig, File file) throws IOException {
        File tmpSegmentDataDir = getTmpSegmentDataDir("tmp-" + str + "." + System.currentTimeMillis());
        try {
            File file2 = TarGzCompressionUtils.untar(file, tmpSegmentDataDir).get(0);
            this._logger.info("Uncompressed file {} into tmp dir {}", file, tmpSegmentDataDir);
            File file3 = new File(this._indexDir, str);
            FileUtils.deleteQuietly(file3);
            FileUtils.moveDirectory(file2, file3);
            this._logger.info("Replacing LLC Segment {}", str);
            replaceLLSegment(str, indexLoadingConfig);
            FileUtils.deleteQuietly(tmpSegmentDataDir);
        } catch (Throwable th) {
            FileUtils.deleteQuietly(tmpSegmentDataDir);
            throw th;
        }
    }

    private boolean isPeerSegmentDownloadEnabled(TableConfig tableConfig) {
        return CommonConstants.HTTP_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()) || CommonConstants.HTTPS_PROTOCOL.equalsIgnoreCase(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
    }

    private void downloadSegmentFromPeer(String str, String str2, IndexLoadingConfig indexLoadingConfig) {
        File file = new File(this._indexDir, str + ".tar.gz");
        try {
            try {
                List<URI> peerServerURIs = PeerServerSegmentFinder.getPeerServerURIs(str, str2, this._helixManager);
                SegmentFetcherFactory.getSegmentFetcher(str2).fetchSegmentToLocal(peerServerURIs, file);
                this._logger.info("Fetched segment {} from: {} to: {} of size: {}", new Object[]{str, peerServerURIs, file, Long.valueOf(file.length())});
                untarAndMoveSegment(str, indexLoadingConfig, file);
                FileUtils.deleteQuietly(file);
            } catch (Exception e) {
                this._logger.warn("Download and move segment {} from peer with scheme {} failed.", new Object[]{str, str2, e});
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            FileUtils.deleteQuietly(file);
            throw th;
        }
    }

    public void replaceHLSegment(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception {
        ZKMetadataProvider.setSegmentZKMetadata(this._propertyStore, this._tableNameWithType, segmentZKMetadata);
        addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, segmentZKMetadata.getSegmentName()), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
    }

    public void replaceLLSegment(String str, IndexLoadingConfig indexLoadingConfig) {
        try {
            addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, str), indexLoadingConfig, ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType)));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String getServerInstance() {
        return this._instanceId;
    }

    private boolean isValid(Schema schema, IndexingConfig indexingConfig) {
        List<String> sortedColumn = indexingConfig.getSortedColumn();
        boolean z = true;
        if (CollectionUtils.isNotEmpty(sortedColumn)) {
            String str = sortedColumn.get(0);
            if (sortedColumn.size() > 1) {
                this._logger.warn("More than one sorted column configured. Using {}", str);
            }
            if (!schema.getFieldSpecFor(str).isSingleValueField()) {
                this._logger.error("Cannot configure multi-valued column {} as sorted column", str);
                z = false;
            }
        }
        try {
            SchemaUtils.validate(schema);
        } catch (Exception e) {
            this._logger.error("Caught exception while validating schema: {}", schema.getSchemaName(), e);
            z = false;
        }
        return z;
    }
}
