fix oauth code
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / vesdomain / cm / StrimziKafkaCMVESMsgConsumer.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import com.fasterxml.jackson.databind.JsonNode;
22 import java.time.Instant;
23 import java.time.ZoneId;
24 import java.util.Iterator;
25 import java.util.Map;
26
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
34
35     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
36
37     public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
38         super(generalConfig);
39         LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
40     }
41
42     @Override
43     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
44         LOG.debug("Processing CM message {}", msg);
45         JsonNode rootNode = convertMessageToJsonNode(msg);
46         try {
47
48             String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
49             String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
50
51             if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
52                 LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId);
53                 processMoiChanges(rootNode);
54             } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
55                 LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId);
56                 sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
57             } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
58                 LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId);
59                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
60             } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
61                 LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId);
62                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
63             } else {
64                 LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
65                 throw new InvalidMessageException();
66             }
67
68         } catch (NullPointerException e) {
69             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
70             throw new InvalidMessageException("Missing field");
71         }
72     }
73
74     private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
75         return CMBasicHeaderFieldsNotification.builder()
76             .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
77             .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
78             .withCMOccurrenceTime(Instant
79                 .ofEpochMilli(
80                     rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
81                 .atZone(ZoneId.of("Z")).toString())
82             .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
83             .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
84             .build();
85     }
86
87     private void processMoiChanges(JsonNode rootNode) {
88         Iterator<JsonNode> nodes = rootNode
89             .at("/event/stndDefinedFields/data/moiChanges")
90             .elements();
91         while (nodes.hasNext()) {
92             sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
93         }
94     }
95
96     public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
97         JsonNode slaidNode = nodes.next();
98         return CMNotificationClient.createCMNotificationPayloadMap(
99             CMNotification.builder()
100                 .withCMBasicHeaderFieldsNotification(
101                     prepareCMCommonHeaderFields(rootNode))
102                 .withCMNotificationId(slaidNode.get("notificationId").toString())
103                 .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
104                 .withCMPath(slaidNode.get("path").textValue())
105                 .withCMOperation(slaidNode.get("operation").textValue())
106                 .withCMValue(slaidNode.get("value").toString()
107                     .replace("\"", ""))
108                 .build());
109     }
110
111     public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
112         return CMNotificationClient.createCMNotificationPayloadMap(
113             CMNotification.builder()
114                 .withCMBasicHeaderFieldsNotification(
115                     prepareCMCommonHeaderFields(rootNode))
116                 .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
117                 .withCMValue(rootNode.at(cmValueKey).toString()
118                     .replace("\"", ""))
119                 .build());
120     }
121
122     private void sendCMNotification(Map<String, String> payloadMapMessage) {
123         CMNotificationClient cmClient = setRESTConfAuthorization();
124         String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
125         cmClient.sendNotification(message);
126     }
127
128
129     private CMNotificationClient setRESTConfAuthorization() {
130         String sdnrUser = getSDNRUser();
131         String sdnrPasswd = getSDNRPasswd();
132
133         CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
134         LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
135         cmClient.setAuthorization(sdnrUser, sdnrPasswd);
136         return cmClient;
137     }
138 }