2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import com.fasterxml.jackson.databind.JsonNode;
22 import java.time.Instant;
23 import java.time.ZoneId;
24 import java.util.Iterator;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
35 private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
37 public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
39 LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
43 public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
44 LOG.debug("Processing CM message {}", msg);
45 JsonNode rootNode = convertMessageToJsonNode(msg);
48 String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
49 String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
51 if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
52 LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId);
53 processMoiChanges(rootNode);
54 } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
55 LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId);
56 sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
57 } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
58 LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId);
59 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
60 } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
61 LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId);
62 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
64 LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
65 throw new InvalidMessageException();
68 } catch (NullPointerException e) {
69 LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
70 throw new InvalidMessageException("Missing field");
74 private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
75 return CMBasicHeaderFieldsNotification.builder()
76 .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
77 .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
78 .withCMOccurrenceTime(Instant
80 rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
81 .atZone(ZoneId.of("Z")).toString())
82 .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
83 .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
87 private void processMoiChanges(JsonNode rootNode) {
88 Iterator<JsonNode> nodes = rootNode
89 .at("/event/stndDefinedFields/data/moiChanges")
91 while (nodes.hasNext()) {
92 sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
96 public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
97 JsonNode slaidNode = nodes.next();
98 return CMNotificationClient.createCMNotificationPayloadMap(
99 CMNotification.builder()
100 .withCMBasicHeaderFieldsNotification(
101 prepareCMCommonHeaderFields(rootNode))
102 .withCMNotificationId(slaidNode.get("notificationId").toString())
103 .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
104 .withCMPath(slaidNode.get("path").textValue())
105 .withCMOperation(slaidNode.get("operation").textValue())
106 .withCMValue(slaidNode.get("value").toString()
111 public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
112 return CMNotificationClient.createCMNotificationPayloadMap(
113 CMNotification.builder()
114 .withCMBasicHeaderFieldsNotification(
115 prepareCMCommonHeaderFields(rootNode))
116 .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
117 .withCMValue(rootNode.at(cmValueKey).toString()
122 private void sendCMNotification(Map<String, String> payloadMapMessage) {
123 CMNotificationClient cmClient = setRESTConfAuthorization();
124 String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
125 cmClient.sendNotification(message);
129 private CMNotificationClient setRESTConfAuthorization() {
130 String sdnrUser = getSDNRUser();
131 String sdnrPasswd = getSDNRPasswd();
133 CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
134 LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
135 cmClient.setAuthorization(sdnrUser, sdnrPasswd);