2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.rest.stub.controller;
23 import static org.onap.cps.ncmp.dmi.rest.stub.utils.ModuleResponseType.MODULE_REFERENCE_RESPONSE;
24 import static org.onap.cps.ncmp.dmi.rest.stub.utils.ModuleResponseType.MODULE_RESOURCE_RESPONSE;
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.JsonNode;
28 import com.fasterxml.jackson.databind.ObjectMapper;
29 import io.cloudevents.CloudEvent;
30 import io.cloudevents.core.builder.CloudEventBuilder;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.UUID;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.stream.Collectors;
39 import lombok.RequiredArgsConstructor;
40 import lombok.extern.slf4j.Slf4j;
41 import org.json.simple.parser.JSONParser;
42 import org.json.simple.parser.ParseException;
43 import org.onap.cps.ncmp.dmi.datajobs.model.SubjobWriteRequest;
44 import org.onap.cps.ncmp.dmi.datajobs.model.SubjobWriteResponse;
45 import org.onap.cps.ncmp.dmi.rest.stub.controller.aop.ModuleInitialProcess;
46 import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.DataOperationRequest;
47 import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.DmiDataOperationRequest;
48 import org.onap.cps.ncmp.dmi.rest.stub.model.data.operational.DmiOperationCmHandle;
49 import org.onap.cps.ncmp.dmi.rest.stub.service.YangModuleFactory;
50 import org.onap.cps.ncmp.dmi.rest.stub.utils.EventDateTimeFormatter;
51 import org.onap.cps.ncmp.dmi.rest.stub.utils.ModuleResponseType;
52 import org.onap.cps.ncmp.dmi.rest.stub.utils.ResourceFileReaderUtil;
53 import org.onap.cps.ncmp.events.async1_0_0.Data;
54 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
55 import org.onap.cps.ncmp.events.async1_0_0.Response;
56 import org.springframework.beans.factory.annotation.Value;
57 import org.springframework.context.ApplicationContext;
58 import org.springframework.core.io.ResourceLoader;
59 import org.springframework.http.HttpStatus;
60 import org.springframework.http.ResponseEntity;
61 import org.springframework.kafka.core.KafkaTemplate;
62 import org.springframework.web.bind.annotation.DeleteMapping;
63 import org.springframework.web.bind.annotation.GetMapping;
64 import org.springframework.web.bind.annotation.PathVariable;
65 import org.springframework.web.bind.annotation.PostMapping;
66 import org.springframework.web.bind.annotation.PutMapping;
67 import org.springframework.web.bind.annotation.RequestBody;
68 import org.springframework.web.bind.annotation.RequestHeader;
69 import org.springframework.web.bind.annotation.RequestMapping;
70 import org.springframework.web.bind.annotation.RequestParam;
71 import org.springframework.web.bind.annotation.RestController;
72 import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
75 @RequestMapping("${rest.api.dmi-stub-base-path}")
77 @RequiredArgsConstructor
78 public class DmiRestStubController {
80 private static final String DEFAULT_PASSTHROUGH_OPERATION = "read";
81 private static final String DATA_OPERATION_EVENT_TYPE = "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent";
82 private static final Map<String, String> moduleSetTagPerCmHandleId = new HashMap<>();
83 private static final List<String> MODULE_SET_TAGS = YangModuleFactory.generateTags();
84 private static final String DEFAULT_TAG = "tagDefault";
86 private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
87 private final ObjectMapper objectMapper;
88 private final ApplicationContext applicationContext;
89 private final AtomicInteger subJobWriteRequestCounter = new AtomicInteger();
90 private final YangModuleFactory yangModuleFactory;
92 @Value("${app.ncmp.async-m2m.topic}")
93 private String ncmpAsyncM2mTopic;
94 @Value("${delay.module-references-delay-ms}")
95 private long moduleReferencesDelayMs;
96 @Value("${delay.module-resources-delay-ms}")
97 private long moduleResourcesDelayMs;
98 @Value("${delay.read-data-for-cm-handle-delay-ms}")
99 private long readDataForCmHandleDelayMs;
100 @Value("${delay.write-data-for-cm-handle-delay-ms}")
101 private long writeDataForCmHandleDelayMs;
104 * This code defines a REST API endpoint for adding new the module set tag mapping. The endpoint receives the
105 * cmHandleId and moduleSetTag as request body and add into moduleSetTagPerCmHandleId map with the provided
108 * @param requestBody map of cmHandleId and moduleSetTag
109 * @return a ResponseEntity object containing the updated moduleSetTagPerCmHandleId map as the response body
111 @PostMapping("/v1/tagMapping")
112 public ResponseEntity<Map<String, String>> addTagForMapping(@RequestBody final Map<String, String> requestBody) {
113 log.info("tagMapping: add {}", requestBody);
114 moduleSetTagPerCmHandleId.putAll(requestBody);
115 return new ResponseEntity<>(requestBody, HttpStatus.CREATED);
119 * This code defines a GET endpoint of module set tag mapping.
121 * @return The map represents the module set tag mapping.
123 @GetMapping("/v1/tagMapping")
124 public ResponseEntity<Map<String, String>> getTagMapping() {
125 log.info("tagMapping: get");
126 return ResponseEntity.ok(moduleSetTagPerCmHandleId);
130 * This code defines a GET endpoint of module set tag by cm handle ID.
132 * @return The map represents the module set tag mapping filtered by cm handle ID.
134 @GetMapping("/v1/tagMapping/ch/{cmHandleId}")
135 public ResponseEntity<String> getTagMappingByCmHandleId(@PathVariable final String cmHandleId) {
136 log.info("tagMapping: get cm handle id: {}", cmHandleId);
137 return ResponseEntity.ok(moduleSetTagPerCmHandleId.get(cmHandleId));
141 * This code defines a REST API endpoint for updating the module set tag mapping. The endpoint receives the
142 * cmHandleId and moduleSetTag as request body and updates the moduleSetTagPerCmHandleId map with the provided
145 * @param requestBody map of cmHandleId and moduleSetTag
146 * @return a ResponseEntity object containing the updated moduleSetTagPerCmHandleId map as the response body
149 @PutMapping("/v1/tagMapping")
150 public ResponseEntity<Map<String, String>> updateTagMapping(@RequestBody final Map<String, String> requestBody) {
151 log.info("tagMapping: update: {}", requestBody);
152 moduleSetTagPerCmHandleId.putAll(requestBody);
153 return ResponseEntity.noContent().build();
157 * It contains a method to delete an entry from the moduleSetTagPerCmHandleId map.
158 * The method takes a cmHandleId as a parameter and removes the corresponding entry from the map.
160 * @return a ResponseEntity containing the updated map.
162 @DeleteMapping("/v1/tagMapping/ch/{cmHandleId}")
163 public ResponseEntity<String> deleteTagMappingByCmHandleId(@PathVariable final String cmHandleId) {
164 log.info("tagMapping: remove cm handle id: {}", cmHandleId);
165 moduleSetTagPerCmHandleId.remove(cmHandleId);
166 return ResponseEntity.ok(String.format("Mapping of %s is deleted successfully", cmHandleId));
170 * Get all modules for given cm handle.
172 * @param cmHandleId The identifier for a network function, network element, subnetwork,
173 * or any other cm object by managed Network CM Proxy
174 * @param moduleReferencesRequest module references request body
175 * @return ResponseEntity response entity having module response as json string.
177 @PostMapping("/v1/ch/{cmHandleId}/modules")
178 @ModuleInitialProcess
179 public ResponseEntity<String> getModuleReferences(@PathVariable("cmHandleId") final String cmHandleId,
180 @RequestBody final Object moduleReferencesRequest) {
181 log.info("get module references for cm handle id: {}", cmHandleId);
182 return processModuleRequest(moduleReferencesRequest, MODULE_REFERENCE_RESPONSE, moduleReferencesDelayMs);
186 * Get module resources for a given cmHandleId.
188 * @param cmHandleId The identifier for a network function, network element, subnetwork,
189 * or any other cm object by managed Network CM Proxy
190 * @param moduleResourcesReadRequest module resources read request body
191 * @return ResponseEntity response entity having module resources response as json string.
193 @PostMapping("/v1/ch/{cmHandleId}/moduleResources")
194 @ModuleInitialProcess
195 public ResponseEntity<String> getModuleResources(
196 @PathVariable("cmHandleId") final String cmHandleId,
197 @RequestBody final Object moduleResourcesReadRequest) {
198 log.info("get module resources for cm handle id: {}", cmHandleId);
199 return processModuleRequest(moduleResourcesReadRequest, MODULE_RESOURCE_RESPONSE, moduleResourcesDelayMs);
203 * Create resource data from passthrough operational or running for a cm handle.
205 * @param cmHandleId The identifier for a network function, network element, subnetwork,
206 * or any other cm object by managed Network CM Proxy
207 * @param datastoreName datastore name
208 * @param resourceIdentifier resource identifier
209 * @param options options
210 * @param topic client given topic name
211 * @return (@ code ResponseEntity) response entity
213 @PostMapping("/v1/ch/{cmHandleId}/data/ds/{datastoreName}")
214 public ResponseEntity<String> getResourceDataForCmHandle(
215 @PathVariable("cmHandleId") final String cmHandleId,
216 @PathVariable("datastoreName") final String datastoreName,
217 @RequestParam(value = "resourceIdentifier") final String resourceIdentifier,
218 @RequestParam(value = "options", required = false) final String options,
219 @RequestParam(value = "topic", required = false) final String topic,
220 @RequestHeader(value = "Authorization", required = false) final String authorization,
221 @RequestBody final String requestBody) {
222 log.info("create resource data for cm handle id: {}", cmHandleId);
223 log.debug("DMI AUTH HEADER: {}", authorization);
224 final String passthroughOperationType = getPassthroughOperationType(requestBody);
225 if (passthroughOperationType.equals("read")) {
226 delay(readDataForCmHandleDelayMs);
228 delay(writeDataForCmHandleDelayMs);
230 log.debug("Logging request body {}", requestBody);
232 final String sampleJson = ResourceFileReaderUtil.getResourceFileContent(applicationContext.getResource(
233 ResourceLoader.CLASSPATH_URL_PREFIX + "data/ietf-network-topology-sample-rfc8345.json"));
234 return ResponseEntity.ok(sampleJson.replace("#network-id", getCompositeNetworkId(cmHandleId)));
238 * This method is not implemented for ONAP DMI plugin.
240 * @param topic client given topic name
241 * @param requestId requestId generated by NCMP as an ack for client
242 * @param dmiDataOperationRequest list of operation details
243 * @return (@ code ResponseEntity) response entity
245 @PostMapping("/v1/data")
246 public ResponseEntity<Void> getResourceDataForCmHandleDataOperation(
247 @RequestParam(value = "topic") final String topic,
248 @RequestParam(value = "requestId") final String requestId,
249 @RequestBody final DmiDataOperationRequest dmiDataOperationRequest) {
250 delay(writeDataForCmHandleDelayMs);
252 log.info("Request received from the NCMP to DMI Plugin: {}",
253 objectMapper.writeValueAsString(dmiDataOperationRequest));
254 } catch (final JsonProcessingException jsonProcessingException) {
255 log.warn("Unable to process dmi data operation request to json string");
257 dmiDataOperationRequest.getOperations().forEach(dmiDataOperation -> {
258 final DataOperationEvent dataOperationEvent = getDataOperationEvent(dmiDataOperation);
259 dmiDataOperation.getCmHandles().forEach(dmiOperationCmHandle -> {
260 log.debug("Module Set Tag received: {}", dmiOperationCmHandle.getModuleSetTag());
261 dataOperationEvent.getData().getResponses().get(0).setIds(List.of(dmiOperationCmHandle.getId()));
262 final CloudEvent cloudEvent = buildAndGetCloudEvent(topic, requestId, dataOperationEvent);
263 cloudEventKafkaTemplate.send(ncmpAsyncM2mTopic, UUID.randomUUID().toString(), cloudEvent);
266 return new ResponseEntity<>(HttpStatus.ACCEPTED);
270 * Consume sub-job write requests from NCMP.
272 * @param subJobWriteRequest contains a collection of write requests and metadata.
273 * @param destination the destination of the results. ( e.g. S3 Bucket).
274 * @return (@ code ResponseEntity) response for the write request.
276 @PostMapping("/v1/cmwriteJob")
277 public ResponseEntity<SubjobWriteResponse> consumeWriteSubJobs(
278 @RequestBody final SubjobWriteRequest subJobWriteRequest,
279 @RequestParam("destination") final String destination) {
280 log.info("cm write (datajob) request");
281 log.info("Destination: {}", destination);
282 log.info("Request body: {}", subJobWriteRequest);
283 return ResponseEntity.ok(new SubjobWriteResponse(String.valueOf(subJobWriteRequestCounter.incrementAndGet()),
284 "some-dmi-service-name", "my-data-producer-id"));
288 * Retrieves the status of a given data job identified by {@code requestId} and {@code dataProducerJobId}.
290 * @param dataProducerId ID of the producer registered by DMI for the alternateIDs
291 * in the operations in this request.
292 * @param dataProducerJobId Identifier of the data producer job.
293 * @return A ResponseEntity with HTTP status 200 (OK) and the data job's status as a string.
295 @GetMapping("/v1/cmwriteJob/dataProducer/{dataProducerId}/dataProducerJob/{dataProducerJobId}/status")
296 public ResponseEntity<Map<String, String>> retrieveDataJobStatus(
297 @PathVariable("dataProducerId") final String dataProducerId,
298 @PathVariable("dataProducerJobId") final String dataProducerJobId) {
299 log.info("Received request to retrieve data job status. Request ID: {}, Data Producer Job ID: {}",
300 dataProducerId, dataProducerJobId);
301 return ResponseEntity.ok(Map.of("status", "FINISHED"));
305 * Retrieves the result of a given data job identified by {@code requestId} and {@code dataProducerJobId}.
307 * @param dataProducerId Identifier for the data producer as a query parameter (required)
308 * @param dataProducerJobId Identifier for the data producer job (required)
309 * @param destination The destination of the results, Kafka topic name or s3 bucket name (required)
310 * @return A ResponseEntity with HTTP status 200 (OK) and the data job's result as an Object.
312 @GetMapping("/v1/cmwriteJob/dataProducer/{dataProducerId}/dataProducerJob/{dataProducerJobId}/result")
313 public ResponseEntity<Object> retrieveDataJobResult(
314 @PathVariable("dataProducerId") final String dataProducerId,
315 @PathVariable("dataProducerJobId") final String dataProducerJobId,
316 @RequestParam(name = "destination") final String destination) {
317 log.info("Received request to retrieve data job result. Data Producer ID: {}, "
318 + "Data Producer Job ID: {}, Destination: {}",
319 dataProducerId, dataProducerJobId, destination);
320 return ResponseEntity.ok(Map.of("result", "some status"));
323 private CloudEvent buildAndGetCloudEvent(final String topic, final String requestId,
324 final DataOperationEvent dataOperationEvent) {
325 CloudEvent cloudEvent = null;
327 cloudEvent = CloudEventBuilder.v1()
328 .withId(UUID.randomUUID().toString())
329 .withSource(URI.create("DMI"))
330 .withType(DATA_OPERATION_EVENT_TYPE)
331 .withDataSchema(URI.create("urn:cps:" + DATA_OPERATION_EVENT_TYPE + ":1.0.0"))
332 .withTime(EventDateTimeFormatter.toIsoOffsetDateTime(
333 EventDateTimeFormatter.getCurrentIsoFormattedDateTime()))
334 .withData(objectMapper.writeValueAsBytes(dataOperationEvent))
335 .withExtension("destination", topic)
336 .withExtension("correlationid", requestId)
338 } catch (final JsonProcessingException jsonProcessingException) {
339 log.error("Unable to parse event into bytes. cause : {}", jsonProcessingException.getMessage());
344 private DataOperationEvent getDataOperationEvent(final DataOperationRequest dataOperationRequest) {
345 final Response response = new Response();
347 response.setOperationId(dataOperationRequest.getOperationId());
348 response.setStatusCode("0");
349 response.setStatusMessage("Successfully applied changes");
350 response.setIds(dataOperationRequest.getCmHandles().stream().map(DmiOperationCmHandle::getId)
351 .collect(Collectors.toList()));
352 response.setResourceIdentifier(dataOperationRequest.getResourceIdentifier());
353 response.setOptions(dataOperationRequest.getOptions());
354 final String ietfNetworkTopologySample = ResourceFileReaderUtil.getResourceFileContent(
355 applicationContext.getResource(ResourceLoader.CLASSPATH_URL_PREFIX
356 + "data/ietf-network-topology-sample-rfc8345.json"));
357 final JSONParser jsonParser = new JSONParser();
359 response.setResult(jsonParser.parse(ietfNetworkTopologySample));
360 } catch (final ParseException parseException) {
361 log.error("Unable to parse event result as json object. cause : {}", parseException.getMessage());
363 final List<Response> responseList = new ArrayList<>(1);
364 responseList.add(response);
365 final Data data = new Data();
366 data.setResponses(responseList);
367 final DataOperationEvent dataOperationEvent = new DataOperationEvent();
368 dataOperationEvent.setData(data);
369 return dataOperationEvent;
372 private ResponseEntity<String> processModuleRequest(final Object moduleRequest,
373 final ModuleResponseType moduleResponseType,
374 final long simulatedResponseDelay) {
375 logRequestBody(moduleRequest);
376 String moduleResponseContent = "";
377 String moduleSetTag = extractModuleSetTagFromRequest(moduleRequest);
379 moduleSetTag = (!isModuleSetTagNullOrEmpty(moduleSetTag)
380 && MODULE_SET_TAGS.contains(moduleSetTag)) ? moduleSetTag : DEFAULT_TAG;
382 if (MODULE_RESOURCE_RESPONSE == moduleResponseType) {
383 moduleResponseContent = yangModuleFactory.getModuleResourcesJson(moduleSetTag);
385 moduleResponseContent = yangModuleFactory.getModuleReferencesJson(moduleSetTag);
388 delay(simulatedResponseDelay);
389 return ResponseEntity.ok(moduleResponseContent);
392 private String extractModuleSetTagFromRequest(final Object moduleReferencesRequest) {
393 final JsonNode rootNode = objectMapper.valueToTree(moduleReferencesRequest);
394 return rootNode.path("moduleSetTag").asText(null);
397 private boolean isModuleSetTagNullOrEmpty(final String moduleSetTag) {
398 return moduleSetTag == null || moduleSetTag.trim().isEmpty();
401 private void logRequestBody(final Object request) {
403 log.debug("Incoming DMI request body: {}", objectMapper.writeValueAsString(request));
404 } catch (final JsonProcessingException jsonProcessingException) {
405 log.warn("Unable to parse DMI request to json string");
409 private String getPassthroughOperationType(final String requestBody) {
411 final JsonNode rootNode = objectMapper.readTree(requestBody);
412 return rootNode.path("operation").asText();
413 } catch (final JsonProcessingException jsonProcessingException) {
414 log.error("Invalid JSON format. cause : {}", jsonProcessingException.getMessage());
416 return DEFAULT_PASSTHROUGH_OPERATION;
419 private void delay(final long milliseconds) {
421 Thread.sleep(milliseconds);
422 } catch (final InterruptedException e) {
423 log.error("Thread sleep interrupted: {}", e.getMessage());
424 Thread.currentThread().interrupt();
428 private static String getCompositeNetworkId(final String cmHandleId) {
429 final String servletUri = ServletUriComponentsBuilder
430 .fromCurrentContextPath() // scheme://host:port
433 return servletUri + "-" + cmHandleId; // e.g. http://cps-ncmp-dmi-stub-1:8092-my-cm-handle