package org.apache.pinot.server.api.resources;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import org.apache.pinot.segment.local.realtime.writer.StatelessRealtimeSegmentWriter;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags = {"Reingestion"}, authorizations = {@Authorization("oauth"), @Authorization("database")})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {@ApiKeyAuthDefinition(name = "Authorization", in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = "oauth", description = "The format of the key is  ```\"Basic <token>\" or \"Bearer <token>\"```"), @ApiKeyAuthDefinition(name = "database", in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = "database", description = "Database context passed through http header. If no context is provided 'default' database context will be considered.")}))
@Path("/")
/* loaded from: input_file:org/apache/pinot/server/api/resources/ReingestionResource.class */
public class ReingestionResource {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReingestionResource.class);
    private static final int MAX_PARALLEL_REINGESTIONS = Math.max(Runtime.getRuntime().availableProcessors() / 2, 1);
    public static final long CONSUMPTION_END_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();
    public static final long CHECK_INTERVAL_MS = Duration.ofSeconds(5).toMillis();
    private final ConcurrentHashMap<String, AtomicBoolean> _reingestingSegments = new ConcurrentHashMap<>();
    private final ExecutorService _reingestionExecutor = Executors.newFixedThreadPool(MAX_PARALLEL_REINGESTIONS, new ThreadFactoryBuilder().setNameFormat("reingestion-worker-%d").build());
    private final ConcurrentHashMap<String, ReingestionJob> _runningJobs = new ConcurrentHashMap<>();

    @Inject
    private ServerInstance _serverInstance;

    /* loaded from: input_file:org/apache/pinot/server/api/resources/ReingestionResource$ReingestionJob.class */
    public static class ReingestionJob {
        private final String _jobId;
        private final String _segmentName;
        private final long _startTimeMs = System.currentTimeMillis();

        @JsonCreator
        ReingestionJob(@JsonProperty("jobId") String str, @JsonProperty("segmentName") String str2) {
            this._jobId = str;
            this._segmentName = str2;
        }

        public String getJobId() {
            return this._jobId;
        }

        public String getSegmentName() {
            return this._segmentName;
        }

        public long getStartTimeMs() {
            return this._startTimeMs;
        }
    }

    @GET
    @Path("/reingestSegment/jobs")
    @ApiOperation("Get all running re-ingestion jobs along with job IDs")
    @Produces({"application/json"})
    public Response getAllRunningReingestionJobs() {
        return Response.ok(new ArrayList(this._runningJobs.values())).build();
    }

    @ApiResponses({@ApiResponse(code = 200, message = "Success", response = ReingestionJob.class), @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class)})
    @Path("/reingestSegment/{segmentName}")
    @ApiOperation(value = "Re-ingest segment asynchronously", notes = "Returns a jobId immediately; ingestion runs in background.")
    @POST
    @Produces({"application/json"})
    public Response reingestSegment(@PathParam("segmentName") String str) {
        LOGGER.info("Re-ingesting segment: {}", str);
        if (!LLCSegmentName.isLLCSegment(str)) {
            throw new WebApplicationException("Segment name is not in LLC format: " + str, Response.Status.BAD_REQUEST);
        }
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(new LLCSegmentName(str).getTableName());
        RealtimeTableDataManager tableDataManager = this._serverInstance.getInstanceDataManager().getTableDataManager(tableNameWithType);
        if (tableDataManager == null) {
            throw new WebApplicationException("Table data manager not found for table: " + tableNameWithType, Response.Status.NOT_FOUND);
        }
        try {
            SegmentZKMetadata fetchZKMetadata = tableDataManager.fetchZKMetadata(str);
            if (fetchZKMetadata.getDownloadUrl() != null) {
                throw new WebApplicationException("Download URL is already present for segment: " + str + ". No need to re-ingest.", Response.Status.BAD_REQUEST);
            }
            String startOffset = fetchZKMetadata.getStartOffset();
            String endOffset = fetchZKMetadata.getEndOffset();
            if (startOffset == null || endOffset == null) {
                throw new WebApplicationException("Null start/end offset for segment: " + str, Response.Status.INTERNAL_SERVER_ERROR);
            }
            IndexLoadingConfig fetchIndexLoadingConfig = tableDataManager.fetchIndexLoadingConfig();
            if (!this._reingestingSegments.computeIfAbsent(str, str2 -> {
                return new AtomicBoolean(false);
            }).compareAndSet(false, true)) {
                return Response.status(Response.Status.CONFLICT).entity("Re-ingestion for segment: " + str + " is already in progress.").build();
            }
            String uuid = UUID.randomUUID().toString();
            ReingestionJob reingestionJob = new ReingestionJob(uuid, str);
            this._reingestionExecutor.submit(() -> {
                try {
                    try {
                        this._runningJobs.put(uuid, reingestionJob);
                        doReingestSegment(tableNameWithType, fetchZKMetadata, fetchIndexLoadingConfig, tableDataManager.getSegmentBuildSemaphore());
                        this._runningJobs.remove(uuid);
                        this._reingestingSegments.remove(str);
                    } catch (Exception e) {
                        LOGGER.error("Error during async re-ingestion for job {} (segment={})", new Object[]{uuid, str, e});
                        this._serverInstance.getServerMetrics().addMeteredTableValue(tableNameWithType, ServerMeter.SEGMENT_REINGESTION_FAILURE, 1L);
                        this._runningJobs.remove(uuid);
                        this._reingestingSegments.remove(str);
                    }
                } catch (Throwable th) {
                    this._runningJobs.remove(uuid);
                    this._reingestingSegments.remove(str);
                    throw th;
                }
            });
            return Response.ok(reingestionJob).build();
        } catch (Exception e) {
            throw new WebApplicationException("Segment ZK metadata not found for segment: " + str, Response.Status.NOT_FOUND);
        }
    }

    private void doReingestSegment(String str, SegmentZKMetadata segmentZKMetadata, IndexLoadingConfig indexLoadingConfig, @Nullable Semaphore semaphore) throws Exception {
        String segmentName = segmentZKMetadata.getSegmentName();
        StatelessRealtimeSegmentWriter statelessRealtimeSegmentWriter = new StatelessRealtimeSegmentWriter(segmentZKMetadata, indexLoadingConfig, semaphore);
        try {
            statelessRealtimeSegmentWriter.startConsumption();
            waitForCondition(r3 -> {
                return Boolean.valueOf(statelessRealtimeSegmentWriter.isDoneConsuming());
            }, CHECK_INTERVAL_MS, CONSUMPTION_END_TIMEOUT_MS, 0L);
            statelessRealtimeSegmentWriter.stopConsumption();
            if (!statelessRealtimeSegmentWriter.isSuccess()) {
                throw new RuntimeException("Failed to consume records", statelessRealtimeSegmentWriter.getConsumptionException());
            }
            new ServerSegmentCompletionProtocolHandler(this._serverInstance.getServerMetrics(), str).uploadReingestedSegment(segmentName, indexLoadingConfig.getSegmentStoreURI(), statelessRealtimeSegmentWriter.buildSegment());
            LOGGER.info("Re-ingested segment {} uploaded successfully", segmentName);
            statelessRealtimeSegmentWriter.close();
        } catch (Throwable th) {
            try {
                statelessRealtimeSegmentWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void waitForCondition(Function<Void, Boolean> function, long j, long j2, long j3) {
        long currentTimeMillis = System.currentTimeMillis() + j2;
        if (j3 > 0) {
            LOGGER.info("Waiting for a grace period of {} ms before starting condition checks", Long.valueOf(j3));
            try {
                Thread.sleep(j3);
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted during grace period wait", e);
            }
        }
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                if (Boolean.TRUE.equals(function.apply((Object) null))) {
                    LOGGER.info("Condition satisfied: {}", function);
                    return;
                }
                Thread.sleep(j);
            } catch (Exception e2) {
                throw new RuntimeException("Caught exception while checking the condition", e2);
            }
        }
        throw new RuntimeException("Timeout waiting for condition: " + String.valueOf(function));
    }
}
