Extend SDNC persistent service to store CM
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / impl / DMaaPCMVESMsgConsumer.java
index 245807e..8412e37 100644 (file)
 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.Iterator;
+import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,19 +39,96 @@ public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
         LOG.debug("Processing CM message {}", msg);
         JsonNode rootNode = convertMessageToJsonNode(msg);
-        JsonNode dataNode;
-        JsonNode notificationNode;
         try {
-            dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull();
-            if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) {
-                notificationNode = dataNode.get("moiChanges");
-                LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId"));
+
+            String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
+            String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
+
+            if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
+                LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
+                processMoiChanges(rootNode);
+            } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
+                LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
+                sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
+            } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
+                LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
+                sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
+            } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
+                LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
+                sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
+            } else {
+                LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
+                throw new InvalidMessageException();
             }
+
         } catch (NullPointerException e) {
             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
             throw new InvalidMessageException("Missing field");
         }
-        // take required data from notificationNode
     }
 
+    private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
+        return CMBasicHeaderFieldsNotification.builder()
+            .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
+            .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
+            .withCMOccurrenceTime(Instant
+                .ofEpochMilli(
+                    rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
+                .atZone(ZoneId.of("Z")).toString())
+            .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
+            .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
+            .build();
+    }
+
+    private void processMoiChanges(JsonNode rootNode) {
+        Iterator<JsonNode> nodes = rootNode
+            .at("/event/stndDefinedFields/data/moiChanges")
+            .elements();
+        while (nodes.hasNext()) {
+            sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
+        }
+    }
+
+    public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
+        JsonNode slaidNode = nodes.next();
+        return CMNotificationClient.createCMNotificationPayloadMap(
+            CMNotification.builder()
+                .withCMBasicHeaderFieldsNotification(
+                    prepareCMCommonHeaderFields(rootNode))
+                .withCMNotificationId(slaidNode.get("notificationId").toString())
+                .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
+                .withCMPath(slaidNode.get("path").textValue())
+                .withCMOperation(slaidNode.get("operation").textValue())
+                .withCMValue(slaidNode.get("value").toString()
+                    .replace("\"", ""))
+                .build());
+    }
+
+    public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
+        return CMNotificationClient.createCMNotificationPayloadMap(
+            CMNotification.builder()
+                .withCMBasicHeaderFieldsNotification(
+                    prepareCMCommonHeaderFields(rootNode))
+                .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
+                .withCMValue(rootNode.at(cmValueKey).toString()
+                    .replace("\"", ""))
+                .build());
+    }
+
+    private void sendCMNotification(Map<String, String> payloadMapMessage) {
+        CMNotificationClient cmClient = setRESTConfAuthorization();
+        String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
+        cmClient.sendNotification(message);
+    }
+
+
+    private CMNotificationClient setRESTConfAuthorization() {
+        String sdnrUser = getSDNRUser();
+        String sdnrPasswd = getSDNRPasswd();
+
+        CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
+        LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+        cmClient.setAuthorization(sdnrUser, sdnrPasswd);
+        return cmClient;
+    }
 }