8412e3730f6618234887175b1e2c563c90805c5c
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import com.fasterxml.jackson.databind.JsonNode;
22 import java.time.Instant;
23 import java.time.ZoneId;
24 import java.util.Iterator;
25 import java.util.Map;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
30
31     private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
32
33     public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
34         super(generalConfig);
35         LOG.info("DMaaPCMVESMsgConsumer started successfully");
36     }
37
38     @Override
39     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
40         LOG.debug("Processing CM message {}", msg);
41         JsonNode rootNode = convertMessageToJsonNode(msg);
42         try {
43
44             String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
45             String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
46
47             if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
48                 LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
49                 processMoiChanges(rootNode);
50             } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
51                 LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
52                 sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
53             } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
54                 LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
55                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
56             } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
57                 LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
58                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
59             } else {
60                 LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
61                 throw new InvalidMessageException();
62             }
63
64         } catch (NullPointerException e) {
65             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
66             throw new InvalidMessageException("Missing field");
67         }
68     }
69
70     private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
71         return CMBasicHeaderFieldsNotification.builder()
72             .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
73             .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
74             .withCMOccurrenceTime(Instant
75                 .ofEpochMilli(
76                     rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
77                 .atZone(ZoneId.of("Z")).toString())
78             .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
79             .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
80             .build();
81     }
82
83     private void processMoiChanges(JsonNode rootNode) {
84         Iterator<JsonNode> nodes = rootNode
85             .at("/event/stndDefinedFields/data/moiChanges")
86             .elements();
87         while (nodes.hasNext()) {
88             sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
89         }
90     }
91
92     public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
93         JsonNode slaidNode = nodes.next();
94         return CMNotificationClient.createCMNotificationPayloadMap(
95             CMNotification.builder()
96                 .withCMBasicHeaderFieldsNotification(
97                     prepareCMCommonHeaderFields(rootNode))
98                 .withCMNotificationId(slaidNode.get("notificationId").toString())
99                 .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
100                 .withCMPath(slaidNode.get("path").textValue())
101                 .withCMOperation(slaidNode.get("operation").textValue())
102                 .withCMValue(slaidNode.get("value").toString()
103                     .replace("\"", ""))
104                 .build());
105     }
106
107     public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
108         return CMNotificationClient.createCMNotificationPayloadMap(
109             CMNotification.builder()
110                 .withCMBasicHeaderFieldsNotification(
111                     prepareCMCommonHeaderFields(rootNode))
112                 .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
113                 .withCMValue(rootNode.at(cmValueKey).toString()
114                     .replace("\"", ""))
115                 .build());
116     }
117
118     private void sendCMNotification(Map<String, String> payloadMapMessage) {
119         CMNotificationClient cmClient = setRESTConfAuthorization();
120         String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
121         cmClient.sendNotification(message);
122     }
123
124
125     private CMNotificationClient setRESTConfAuthorization() {
126         String sdnrUser = getSDNRUser();
127         String sdnrPasswd = getSDNRPasswd();
128
129         CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
130         LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
131         cmClient.setAuthorization(sdnrUser, sdnrPasswd);
132         return cmClient;
133     }
134 }