+ /**
+ * @param requestId UUID to track the incoming request
+ * @param instanceId UUID to identify the requesting instance
+ * @param accept Determines the format of the body of the response
+ * @param authorization Username and password auth towards SDC
+ * @return KafkaDataResponse (Kafka bootstrap server and topic list to be used by clients)
+ */
+ @GET
+ @Path("/distributionKafkaData")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Operation(description = "Kafka data", method = "GET", summary = "return the kafka cluster and topic list", responses = {
+ @ApiResponse(responseCode = "200", description = "ECOMP component is authenticated and kafka endpoint and topic list is returned", content = @Content(array = @ArraySchema(schema = @Schema(implementation = KafkaDataResponse.class)))),
+ @ApiResponse(responseCode = "400", description = "Missing 'X-ECOMP-InstanceID' HTTP header - POL5001"),
+ @ApiResponse(responseCode = "401", description = "ECOMP component should authenticate itself and to re-send again HTTP request with its credentials for Basic Authentication - POL5002"),
+ @ApiResponse(responseCode = "403", description = "ECOMP component is not authorized - POL5003"),
+ @ApiResponse(responseCode = "405", description = "Method Not Allowed: Invalid HTTP method type used ( PUT,DELETE,POST will be rejected) - POL4050"),
+ @ApiResponse(responseCode = "500", description = "The GET request failed either due to internal SDC problem or Cambria Service failure. ECOMP Component should continue the attempts to get the needed information - POL5000")})
+ public Response getKafkaData(
+ @Parameter(description = "X-ECOMP-RequestID header", required = false) @HeaderParam(value = Constants.X_ECOMP_REQUEST_ID_HEADER) String requestId,
+ @Parameter(description = "X-ECOMP-InstanceID header", required = true) @HeaderParam(value = Constants.X_ECOMP_INSTANCE_ID_HEADER) String instanceId,
+ @Parameter(description = "Determines the format of the body of the response", required = false) @HeaderParam(value = Constants.ACCEPT_HEADER) String accept,
+ @Parameter(description = "The username and password", required = true) @HeaderParam(value = Constants.AUTHORIZATION_HEADER) String authorization) {
+ String url = request.getMethod() + " " + request.getRequestURI();
+ log.debug(START_HANDLE_REQUEST_OF, url);
+ ResponseFormat responseFormat;
+ if (instanceId == null) {
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.MISSING_X_ECOMP_INSTANCE_ID);
+ getComponentsUtils().auditGetUebCluster(null, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return buildErrorResponse(responseFormat);
+ }
+ try {
+ Response response;
+ Either<KafkaDataResponse, ResponseFormat> actionResponse = distributionLogic.getKafkaData();
+ if (actionResponse.isRight()) {
+ responseFormat = actionResponse.right().value();
+ response = buildErrorResponse(responseFormat);
+ } else {
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.OK);
+ response = buildOkResponse(responseFormat, actionResponse.left().value());
+ }
+ getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return response;
+ } catch (Exception e) {
+ BeEcompErrorManager.getInstance().logBeRestApiGeneralError("failed to get kafka cluster and topic list from configuration");
+ log.debug("failed to get kafka cluster and topic list from configuration", e);
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.GENERAL_ERROR);
+ getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return buildErrorResponse(responseFormat);
+ }
+ }
+