package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.data.manager.DuoSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.local.utils.tablestate.TableStateUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.class */
public class RealtimeTableDataManager extends BaseTableDataManager {
    private SegmentBuildTimeLeaseExtender _leaseExtender;
    private RealtimeSegmentStatsHistory _statsHistory;
    private final Semaphore _segmentBuildSemaphore;
    private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap;
    private static final String STATS_FILE_NAME = "segment-stats.ser";
    private static final String CONSUMERS_DIR = "consumers";
    private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
    public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS;
    private final Supplier<Boolean> _isServerReadyToServeQueries;
    private IngestionDelayTracker _ingestionDelayTracker;
    private TableDedupMetadataManager _tableDedupMetadataManager;
    private TableUpsertMetadataManager _tableUpsertMetadataManager;
    private BooleanSupplier _isTableReadyToConsumeData;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RealtimeTableDataManager(Semaphore semaphore) {
        this(semaphore, () -> {
            return true;
        });
    }

    public RealtimeTableDataManager(Semaphore semaphore, Supplier<Boolean> supplier) {
        this._partitionGroupIdToSemaphoreMap = new ConcurrentHashMap();
        this._segmentBuildSemaphore = semaphore;
        this._isServerReadyToServeQueries = supplier;
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doInit() {
        this._leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(this._instanceId, this._serverMetrics, this._tableNameWithType);
        this._ingestionDelayTracker = new IngestionDelayTracker(this._serverMetrics, this._tableNameWithType, this, this._isServerReadyToServeQueries);
        File file = new File(this._tableDataDir, STATS_FILE_NAME);
        try {
            this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
        } catch (IOException | ClassNotFoundException e) {
            this._logger.error("Caught exception while reading stats history from: {}", file.getAbsolutePath(), e);
            File file2 = new File(this._tableDataDir, "segment-stats.ser." + String.valueOf(UUID.randomUUID()));
            try {
                FileUtils.moveFile(file, file2);
                this._logger.warn("Saved unreadable {} into {}. Creating a fresh instance", file.getAbsolutePath(), file2.getAbsolutePath());
                try {
                    this._statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(file);
                } catch (Exception e2) {
                    Utils.rethrowException(e2);
                }
            } catch (IOException e3) {
                this._logger.error("Could not move {} to {}", new Object[]{file.getAbsolutePath(), file2.getAbsolutePath(), e3});
                throw new RuntimeException(e);
            }
        }
        this._statsHistory.setMinIntervalBetweenUpdatesMillis(TimeUnit.MILLISECONDS.convert(30L, TimeUnit.MINUTES));
        String consumerDir = getConsumerDir();
        File file3 = new File(consumerDir);
        if (file3.exists()) {
            File[] listFiles = file3.listFiles((file4, str) -> {
                return !str.equals(STATS_FILE_NAME);
            });
            Preconditions.checkState(listFiles != null, "Failed to list segment files from consumer dir: %s for table: %s", consumerDir, this._tableNameWithType);
            for (File file5 : listFiles) {
                if (FileUtils.deleteQuietly(file5)) {
                    this._logger.info("Deleted old file {}", file5.getAbsolutePath());
                } else {
                    this._logger.error("Cannot delete file {}", file5.getAbsolutePath());
                }
            }
        }
        DedupConfig dedupConfig = this._tableConfig.getDedupConfig();
        boolean z = dedupConfig != null && dedupConfig.isDedupEnabled();
        if (z) {
            Schema tableSchema = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
            Preconditions.checkState(tableSchema != null, "Failed to find schema for table: %s", this._tableNameWithType);
            Preconditions.checkState(!CollectionUtils.isEmpty(tableSchema.getPrimaryKeyColumns()), "Primary key columns must be configured for dedup");
            this._tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(this._tableConfig, tableSchema, this, this._serverMetrics);
        }
        UpsertConfig upsertConfig = this._tableConfig.getUpsertConfig();
        if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) {
            Preconditions.checkState(!z, "Dedup and upsert cannot be both enabled for table: %s", this._tableUpsertMetadataManager);
            Schema tableSchema2 = ZKMetadataProvider.getTableSchema(this._propertyStore, this._tableNameWithType);
            Preconditions.checkState(tableSchema2 != null, "Failed to find schema for table: %s", this._tableNameWithType);
            this._tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(this._tableConfig, this._instanceDataManagerConfig.getUpsertConfig());
            this._tableUpsertMetadataManager.init(this._tableConfig, tableSchema2, this);
        }
        if (isDedupEnabled() || isPartialUpsertEnabled()) {
            this._isTableReadyToConsumeData = new BooleanSupplier() { // from class: org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.1
                volatile boolean _allSegmentsLoaded;
                long _lastCheckTimeMs;

                @Override // java.util.function.BooleanSupplier
                public boolean getAsBoolean() {
                    if (this._allSegmentsLoaded) {
                        return true;
                    }
                    synchronized (this) {
                        if (this._allSegmentsLoaded) {
                            return true;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis - this._lastCheckTimeMs <= RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS) {
                            return false;
                        }
                        this._lastCheckTimeMs = currentTimeMillis;
                        this._allSegmentsLoaded = TableStateUtils.isAllSegmentsLoaded(RealtimeTableDataManager.this._helixManager, RealtimeTableDataManager.this._tableNameWithType);
                        return this._allSegmentsLoaded;
                    }
                }
            };
        } else {
            this._isTableReadyToConsumeData = () -> {
                return true;
            };
        }
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doStart() {
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doShutdown() {
        this._ingestionDelayTracker.shutdown();
        if (this._tableUpsertMetadataManager == null && this._tableDedupMetadataManager == null) {
            releaseAndRemoveAllSegments();
        } else {
            if (this._tableUpsertMetadataManager != null) {
                this._tableUpsertMetadataManager.stop();
            }
            if (this._tableDedupMetadataManager != null) {
                this._tableDedupMetadataManager.stop();
            }
            releaseAndRemoveAllSegments();
            try {
                if (this._tableUpsertMetadataManager != null) {
                    this._tableUpsertMetadataManager.close();
                }
                if (this._tableDedupMetadataManager != null) {
                    this._tableDedupMetadataManager.close();
                }
            } catch (IOException e) {
                this._logger.warn("Caught exception while closing upsert metadata manager", e);
            }
        }
        if (this._leaseExtender != null) {
            this._leaseExtender.shutDown();
        }
    }

    public void updateIngestionMetrics(String str, int i, long j, long j2, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset, @Nullable StreamPartitionMsgOffset streamPartitionMsgOffset2) {
        this._ingestionDelayTracker.updateIngestionMetrics(str, i, j, j2, streamPartitionMsgOffset, streamPartitionMsgOffset2);
    }

    public long getPartitionIngestionTimeMs(String str) {
        return this._ingestionDelayTracker.getPartitionIngestionTimeMs(new LLCSegmentName(str).getPartitionGroupId());
    }

    public void removeIngestionMetrics(String str) {
        this._ingestionDelayTracker.stopTrackingPartitionIngestionDelay(str);
    }

    public void onConsumingToDropped(String str) {
        this._ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new LLCSegmentName(str).getPartitionGroupId());
    }

    public void onConsumingToOnline(String str) {
        this._ingestionDelayTracker.markPartitionForVerification(str);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    public List<SegmentContext> getSegmentContexts(List<IndexSegment> list, Map<String, String> map) {
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(indexSegment -> {
            arrayList.add(new SegmentContext(indexSegment));
        });
        if (isUpsertEnabled() && !QueryOptionsUtils.isSkipUpsert(map)) {
            this._tableUpsertMetadataManager.setSegmentContexts(arrayList, map);
        }
        return arrayList;
    }

    public Set<Integer> getHostedPartitionsGroupIds() {
        HashSet hashSet = new HashSet();
        Iterator it = TableStateUtils.getSegmentsInGivenStateForThisInstance(this._helixManager, this._tableNameWithType, "CONSUMING").iterator();
        while (it.hasNext()) {
            hashSet.add(Integer.valueOf(new LLCSegmentName((String) it.next()).getPartitionGroupId()));
        }
        return hashSet;
    }

    public RealtimeSegmentStatsHistory getStatsHistory() {
        return this._statsHistory;
    }

    public Semaphore getSegmentBuildSemaphore() {
        return this._segmentBuildSemaphore;
    }

    public String getConsumerDir() {
        File consumerDirPath = getConsumerDirPath();
        if (!consumerDirPath.exists() && !consumerDirPath.mkdirs()) {
            this._logger.error("Failed to create consumer directory {}", consumerDirPath.getAbsolutePath());
        }
        return consumerDirPath.getAbsolutePath();
    }

    public File getConsumerDirPath() {
        String consumerDir = this._instanceDataManagerConfig.getConsumerDir();
        return consumerDir != null ? new File(consumerDir, this._tableNameWithType) : new File(this._tableDataDir + File.separator + "consumers");
    }

    public boolean isDedupEnabled() {
        return this._tableDedupMetadataManager != null;
    }

    public boolean isUpsertEnabled() {
        return this._tableUpsertMetadataManager != null;
    }

    public boolean isPartialUpsertEnabled() {
        return this._tableUpsertMetadataManager != null && this._tableUpsertMetadataManager.getUpsertMode() == UpsertConfig.Mode.PARTIAL;
    }

    private void handleSegmentPreload(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) {
        handleUpsertPreload(segmentZKMetadata, indexLoadingConfig);
        handleDedupPreload(segmentZKMetadata, indexLoadingConfig);
    }

    private void handleUpsertPreload(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) {
        if (this._tableUpsertMetadataManager == null || !this._tableUpsertMetadataManager.isEnablePreload()) {
            return;
        }
        String segmentName = segmentZKMetadata.getSegmentName();
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, (String) null);
        Preconditions.checkState(realtimeSegmentPartitionId != null, "Failed to get partition id for segment: " + segmentName + " in upsert-enabled table: " + this._tableNameWithType);
        this._tableUpsertMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue()).preloadSegments(indexLoadingConfig);
    }

    private void handleDedupPreload(SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig) {
        if (this._tableDedupMetadataManager == null || !this._tableDedupMetadataManager.isEnablePreload()) {
            return;
        }
        String segmentName = segmentZKMetadata.getSegmentName();
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, segmentZKMetadata, (String) null);
        Preconditions.checkState(realtimeSegmentPartitionId != null, "Failed to get partition id for segment: " + segmentName + " in dedup-enabled table: " + this._tableNameWithType);
        this._tableDedupMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue()).preloadSegments(indexLoadingConfig);
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    protected void doAddOnlineSegment(String str) throws Exception {
        SegmentZKMetadata fetchZKMetadata = fetchZKMetadata(str);
        Preconditions.checkState(fetchZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS, "Segment: %s of table: %s is not committed, cannot make it ONLINE", str, this._tableNameWithType);
        IndexLoadingConfig fetchIndexLoadingConfig = fetchIndexLoadingConfig();
        fetchIndexLoadingConfig.setSegmentTier(fetchZKMetadata.getTier());
        handleSegmentPreload(fetchZKMetadata, fetchIndexLoadingConfig);
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(str);
        if (segmentDataManager == null) {
            addNewOnlineSegment(fetchZKMetadata, fetchIndexLoadingConfig);
        } else {
            if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
                replaceSegmentIfCrcMismatch(segmentDataManager, fetchZKMetadata, fetchIndexLoadingConfig);
                return;
            }
            this._logger.info("Changing segment: {} from CONSUMING to ONLINE", str);
            ((RealtimeSegmentDataManager) segmentDataManager).goOnlineFromConsuming(fetchZKMetadata);
            onConsumingToOnline(str);
        }
    }

    public void addConsumingSegment(String str) throws AttemptsExceededException, RetriableOperationException {
        Preconditions.checkState(!this._shutDown, "Table data manager is already shut down, cannot add CONSUMING segment: %s to table: %s", str, this._tableNameWithType);
        this._logger.info("Adding CONSUMING segment: {}", str);
        Lock segmentLock = getSegmentLock(str);
        segmentLock.lock();
        try {
            try {
                doAddConsumingSegment(str);
                segmentLock.unlock();
            } catch (Exception e) {
                addSegmentError(str, new SegmentErrorInfo(System.currentTimeMillis(), "Caught exception while adding CONSUMING segment", e));
                throw e;
            }
        } catch (Throwable th) {
            segmentLock.unlock();
            throw th;
        }
    }

    private void doAddConsumingSegment(String str) throws AttemptsExceededException, RetriableOperationException {
        SegmentZKMetadata fetchZKMetadata = fetchZKMetadata(str);
        if (fetchZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
            this._logger.warn("Segment: {} is already committed, skipping adding it as CONSUMING segment", str);
            return;
        }
        IndexLoadingConfig fetchIndexLoadingConfig = fetchIndexLoadingConfig();
        handleSegmentPreload(fetchZKMetadata, fetchIndexLoadingConfig);
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(str);
        if (segmentDataManager != null) {
            this._logger.warn("Segment: {} ({}) already exists, skipping adding it as CONSUMING segment", str, segmentDataManager instanceof RealtimeSegmentDataManager ? "CONSUMING" : "COMPLETED");
            return;
        }
        this._logger.info("Adding new CONSUMING segment: {}", str);
        FileUtils.deleteQuietly(new File(this._indexDir, str));
        TableConfig tableConfig = fetchIndexLoadingConfig.getTableConfig();
        Schema schema = fetchIndexLoadingConfig.getSchema();
        if (!$assertionsDisabled && (tableConfig == null || schema == null)) {
            throw new AssertionError();
        }
        validate(tableConfig, schema);
        VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, str);
        setDefaultTimeValueIfInvalid(tableConfig, schema, fetchZKMetadata);
        LLCSegmentName lLCSegmentName = new LLCSegmentName(str);
        int partitionGroupId = lLCSegmentName.getPartitionGroupId();
        Semaphore computeIfAbsent = this._partitionGroupIdToSemaphoreMap.computeIfAbsent(Integer.valueOf(partitionGroupId), num -> {
            return new Semaphore(1);
        });
        PartitionUpsertMetadataManager orCreatePartitionManager = this._tableUpsertMetadataManager != null ? this._tableUpsertMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null;
        RealtimeSegmentDataManager realtimeSegmentDataManager = new RealtimeSegmentDataManager(fetchZKMetadata, tableConfig, this, this._indexDir.getAbsolutePath(), fetchIndexLoadingConfig, schema, lLCSegmentName, computeIfAbsent, this._serverMetrics, orCreatePartitionManager, this._tableDedupMetadataManager != null ? this._tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId) : null, this._isTableReadyToConsumeData);
        registerSegment(str, realtimeSegmentDataManager, orCreatePartitionManager);
        if (orCreatePartitionManager != null) {
            orCreatePartitionManager.trackNewlyAddedSegment(str);
        }
        realtimeSegmentDataManager.startConsumption();
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
        this._logger.info("Added new CONSUMING segment: {}", str);
    }

    @VisibleForTesting
    static void setDefaultTimeValueIfInvalid(TableConfig tableConfig, Schema schema, SegmentZKMetadata segmentZKMetadata) {
        String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
        if (StringUtils.isEmpty(timeColumnName)) {
            return;
        }
        DateTimeFieldSpec specForTimeColumn = schema.getSpecForTimeColumn(timeColumnName);
        Preconditions.checkState(specForTimeColumn != null, "Failed to find time field: %s from schema: %s", timeColumnName, schema.getSchemaName());
        String defaultNullValueString = specForTimeColumn.getDefaultNullValueString();
        DateTimeFormatSpec formatSpec = specForTimeColumn.getFormatSpec();
        try {
            if (TimeUtils.timeValueInValidRange(formatSpec.fromFormatToMillis(defaultNullValueString))) {
                return;
            }
        } catch (Exception e) {
        }
        Object convert = specForTimeColumn.getDataType().convert(formatSpec.fromMillisToFormat(segmentZKMetadata.getCreationTime()));
        specForTimeColumn.setDefaultNullValue(convert);
        LOGGER.info("Default time: {} does not comply with format: {}, using creation time: {} as the default time for table: {}", new Object[]{defaultNullValueString, specForTimeColumn.getFormat(), convert, tableConfig.getTableName()});
    }

    @Override // org.apache.pinot.core.data.manager.BaseTableDataManager
    public void addSegment(ImmutableSegment immutableSegment) {
        Preconditions.checkState(!this._shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", immutableSegment.getSegmentName(), this._tableNameWithType);
        if (isUpsertEnabled()) {
            handleUpsert(immutableSegment);
            return;
        }
        if (isDedupEnabled() && (immutableSegment instanceof ImmutableSegmentImpl)) {
            handleDedup((ImmutableSegmentImpl) immutableSegment);
        }
        super.addSegment(immutableSegment);
    }

    private void handleDedup(ImmutableSegmentImpl immutableSegmentImpl) {
        String segmentName = immutableSegmentImpl.getSegmentName();
        this._logger.info("Adding immutable segment: {} with dedup enabled", segmentName);
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, this._tableNameWithType, this._helixManager, (String) null);
        Preconditions.checkNotNull(realtimeSegmentPartitionId, "PartitionId is not available for segment: '" + segmentName + "' (dedup-enabled table: " + this._tableNameWithType + ")");
        PartitionDedupMetadataManager orCreatePartitionManager = this._tableDedupMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue());
        immutableSegmentImpl.enableDedup(orCreatePartitionManager);
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(segmentName);
        if (orCreatePartitionManager.isPreloading()) {
            orCreatePartitionManager.preloadSegment(immutableSegmentImpl);
            LOGGER.info("Preloaded immutable segment: {} with dedup enabled", segmentName);
        } else if (segmentDataManager == null) {
            orCreatePartitionManager.addSegment(immutableSegmentImpl);
            LOGGER.info("Added new immutable segment: {} with dedup enabled", segmentName);
        } else {
            IndexSegment segment = segmentDataManager.getSegment();
            orCreatePartitionManager.replaceSegment(segment, immutableSegmentImpl);
            LOGGER.info("Replaced {} segment: {} with dedup enabled", segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName);
        }
    }

    private void handleUpsert(ImmutableSegment immutableSegment) {
        String segmentName = immutableSegment.getSegmentName();
        this._logger.info("Adding immutable segment: {} with upsert enabled", segmentName);
        Integer realtimeSegmentPartitionId = SegmentUtils.getRealtimeSegmentPartitionId(segmentName, this._tableNameWithType, this._helixManager, (String) null);
        Preconditions.checkNotNull(realtimeSegmentPartitionId, "Failed to get partition id for segment: " + segmentName + " (upsert-enabled table: " + this._tableNameWithType + ")");
        PartitionUpsertMetadataManager orCreatePartitionManager = this._tableUpsertMetadataManager.getOrCreatePartitionManager(realtimeSegmentPartitionId.intValue());
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.DOCUMENT_COUNT, immutableSegment.getSegmentMetadata().getTotalDocs());
        this._serverMetrics.addValueToTableGauge(this._tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
        ImmutableSegmentDataManager immutableSegmentDataManager = new ImmutableSegmentDataManager(immutableSegment);
        if (orCreatePartitionManager.isPreloading()) {
            orCreatePartitionManager.preloadSegment(immutableSegment);
            registerSegment(segmentName, immutableSegmentDataManager, orCreatePartitionManager);
            this._logger.info("Preloaded immutable segment: {} with upsert enabled", segmentName);
            return;
        }
        SegmentDataManager segmentDataManager = this._segmentDataManagerMap.get(segmentName);
        if (segmentDataManager != null) {
            replaceUpsertSegment(segmentName, segmentDataManager, immutableSegmentDataManager, orCreatePartitionManager);
            return;
        }
        registerSegment(segmentName, immutableSegmentDataManager, orCreatePartitionManager);
        orCreatePartitionManager.trackNewlyAddedSegment(segmentName);
        orCreatePartitionManager.addSegment(immutableSegment);
        this._logger.info("Added new immutable segment: {} with upsert enabled", segmentName);
    }

    private void replaceUpsertSegment(String str, SegmentDataManager segmentDataManager, ImmutableSegmentDataManager immutableSegmentDataManager, PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
        IndexSegment segment = segmentDataManager.getSegment();
        ImmutableSegment m16getSegment = immutableSegmentDataManager.m16getSegment();
        UpsertConfig.ConsistencyMode upsertConsistencyMode = this._tableUpsertMetadataManager.getUpsertConsistencyMode();
        if (upsertConsistencyMode == UpsertConfig.ConsistencyMode.NONE) {
            partitionUpsertMetadataManager.replaceSegment(m16getSegment, segment);
            registerSegment(str, immutableSegmentDataManager, partitionUpsertMetadataManager);
        } else {
            registerSegment(str, new DuoSegmentDataManager(immutableSegmentDataManager, segmentDataManager), partitionUpsertMetadataManager);
            partitionUpsertMetadataManager.replaceSegment(m16getSegment, segment);
            registerSegment(str, immutableSegmentDataManager, partitionUpsertMetadataManager);
        }
        Logger logger = this._logger;
        Object[] objArr = new Object[3];
        objArr[0] = segment instanceof ImmutableSegment ? "immutable" : "mutable";
        objArr[1] = str;
        objArr[2] = upsertConsistencyMode;
        logger.info("Replaced {} segment: {} with upsert enabled and consistency mode: {}", objArr);
        segmentDataManager.offload();
        releaseSegment(segmentDataManager);
    }

    private void registerSegment(String str, SegmentDataManager segmentDataManager, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
        if (partitionUpsertMetadataManager != null) {
            partitionUpsertMetadataManager.trackSegmentForUpsertView(segmentDataManager.getSegment());
        }
        registerSegment(str, segmentDataManager);
    }

    public void downloadAndReplaceConsumingSegment(SegmentZKMetadata segmentZKMetadata) throws Exception {
        String segmentName = segmentZKMetadata.getSegmentName();
        this._logger.info("Downloading and replacing CONSUMING segment: {} with committed one", segmentName);
        File downloadSegment = downloadSegment(segmentZKMetadata);
        IndexLoadingConfig fetchIndexLoadingConfig = fetchIndexLoadingConfig();
        fetchIndexLoadingConfig.setSegmentTier(segmentZKMetadata.getTier());
        addSegment(ImmutableSegmentLoader.load(downloadSegment, fetchIndexLoadingConfig));
        this._logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
    }

    public void replaceConsumingSegment(String str) throws Exception {
        this._logger.info("Replacing CONSUMING segment: {} with the one sealed locally", str);
        addSegment(ImmutableSegmentLoader.load(new File(this._indexDir, str), fetchIndexLoadingConfig()));
        this._logger.info("Replaced CONSUMING segment: {}", str);
    }

    public String getServerInstance() {
        return this._instanceId;
    }

    @VisibleForTesting
    public TableUpsertMetadataManager getTableUpsertMetadataManager() {
        return this._tableUpsertMetadataManager;
    }

    public Map<Integer, Long> getUpsertPartitionToPrimaryKeyCount() {
        return isUpsertEnabled() ? this._tableUpsertMetadataManager.getPartitionToPrimaryKeyCount() : Collections.emptyMap();
    }

    private void validate(TableConfig tableConfig, Schema schema) {
        List sortedColumn = tableConfig.getIndexingConfig().getSortedColumn();
        if (CollectionUtils.isNotEmpty(sortedColumn)) {
            String str = (String) sortedColumn.get(0);
            if (sortedColumn.size() > 1) {
                this._logger.warn("More than one sorted column configured. Using {}", str);
            }
            FieldSpec fieldSpecFor = schema.getFieldSpecFor(str);
            Preconditions.checkArgument(fieldSpecFor != null, "Failed to find sorted column: %s in schema for table: %s", str, this._tableNameWithType);
            Preconditions.checkArgument(fieldSpecFor.isSingleValueField(), "Cannot configure multi-valued column %s as sorted column for table: %s", str, this._tableNameWithType);
        }
        SchemaUtils.validate(schema);
    }

    static {
        $assertionsDisabled = !RealtimeTableDataManager.class.desiredAssertionStatus();
        READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);
    }
}
