package org.apache.pinot.controller.api.resources;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(tags = {Constants.TABLE_TAG}, authorizations = {@Authorization(CommonConstants.SWAGGER_AUTHORIZATION_KEY)})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {@ApiKeyAuthDefinition(name = "Authorization", in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = CommonConstants.SWAGGER_AUTHORIZATION_KEY)}))
@Path("/")
/* loaded from: input_file:org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.class */
public class PinotRealtimeTableResource {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) PinotRealtimeTableResource.class);

    @Inject
    ControllerConf _controllerConf;

    @Inject
    Executor _executor;

    @Inject
    HttpConnectionManager _connectionManager;

    @Inject
    PinotHelixResourceManager _pinotHelixResourceManager;

    @Inject
    PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;

    @Path("/tables/{tableName}/pauseConsumption")
    @ApiOperation(value = "Pause consumption of a realtime table", notes = "Pause the consumption of a realtime table")
    @POST
    @Produces({"application/json"})
    public Response pauseConsumption(@PathParam("tableName") @ApiParam(value = "Name of the table", required = true) String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        validate(tableNameWithType);
        try {
            return Response.ok(this._pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
        } catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
        }
    }

    @Path("/tables/{tableName}/resumeConsumption")
    @ApiOperation(value = "Resume consumption of a realtime table", notes = "Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are picked to minimize the data loss.")
    @POST
    @Produces({"application/json"})
    public Response resumeConsumption(@PathParam("tableName") @ApiParam(value = "Name of the table", required = true) String str, @QueryParam("consumeFrom") @ApiParam("smallest | largest") String str2) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        validate(tableNameWithType);
        if (str2 != null && !str2.equalsIgnoreCase("smallest") && !str2.equalsIgnoreCase("largest")) {
            throw new ControllerApplicationException(LOGGER, String.format("consumeFrom param '%s' is not valid.", str2), Response.Status.BAD_REQUEST);
        }
        try {
            return Response.ok(this._pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, str2)).build();
        } catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
        }
    }

    @POST
    @Path("/tables/{tableName}/forceCommit")
    @ApiOperation(value = "Force commit the current consuming segments", notes = "Force commit the current segments in consuming state and restart consumption. This should be used after schema/table config changes. Please note that this is an asynchronous operation, and 200 response does not mean it has actually been done already")
    public Response forceCommit(@PathParam("tableName") @ApiParam(value = "Name of the table", required = true) String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        validate(tableNameWithType);
        try {
            this._pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
            return Response.ok().build();
        } catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
        }
    }

    @GET
    @Path("/tables/{tableName}/pauseStatus")
    @ApiOperation(value = "Return pause status of a realtime table", notes = "Return pause status of a realtime table along with list of consuming segments.")
    @Produces({"application/json"})
    public Response getPauseStatus(@PathParam("tableName") @ApiParam(value = "Name of the table", required = true) String str) {
        String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(str);
        validate(tableNameWithType);
        try {
            return Response.ok().entity(this._pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
        } catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
        }
    }

    @GET
    @ApiResponses({@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Table not found"), @ApiResponse(code = 500, message = "Internal server error")})
    @Path("/tables/{tableName}/consumingSegmentsInfo")
    @ApiOperation(value = "Returns state of consuming segments", notes = "Gets the status of consumers from all servers.Note that the partitionToOffsetMap has been deprecated and will be removed in the next release. The info is now embedded within each partition's state as currentOffsetsMap.")
    @Produces({"application/json"})
    public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo(@PathParam("tableName") @ApiParam(value = "Realtime table name with or without type", required = true, example = "myTable | myTable_REALTIME") String str) {
        try {
            if (TableType.OFFLINE == TableNameBuilder.getTableTypeFromTableName(str)) {
                throw new IllegalStateException("Cannot get consuming segments info for OFFLINE table: " + str);
            }
            return new ConsumingSegmentInfoReader(this._executor, this._connectionManager, this._pinotHelixResourceManager).getConsumingSegmentsInfo(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(str), this._controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
        } catch (Exception e) {
            throw new ControllerApplicationException(LOGGER, String.format("Failed to get consuming segments info for table %s. %s", str, e.getMessage()), Response.Status.INTERNAL_SERVER_ERROR, e);
        }
    }

    private void validate(String str) {
        IdealState tableIdealState = this._pinotHelixResourceManager.getTableIdealState(str);
        if (tableIdealState == null) {
            throw new ControllerApplicationException(LOGGER, String.format("Table %s not found!", str), Response.Status.NOT_FOUND);
        }
        if (!tableIdealState.isEnabled()) {
            throw new ControllerApplicationException(LOGGER, String.format("Table %s is disabled!", str), Response.Status.BAD_REQUEST);
        }
    }
}
