2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 
  20 import com.fasterxml.jackson.core.JsonProcessingException;
 
  21 import com.fasterxml.jackson.databind.JsonNode;
 
  22 import java.time.Instant;
 
  23 import java.time.ZoneId;
 
  24 import java.util.Iterator;
 
  26 import org.slf4j.Logger;
 
  27 import org.slf4j.LoggerFactory;
 
  29 public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
 
  31     private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
 
  33     public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
 
  35         LOG.info("DMaaPCMVESMsgConsumer started successfully");
 
  39     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
 
  40         LOG.debug("Processing CM message {}", msg);
 
  41         JsonNode rootNode = convertMessageToJsonNode(msg);
 
  44             String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
 
  45             String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
 
  47             if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
 
  48                 LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", cmNodeId);
 
  49                 processMoiChanges(rootNode);
 
  50             } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
 
  51                 LOG.info("Read CM message from DMaaP topic that is moiCreation type with id {}", cmNodeId);
 
  52                 sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
 
  53             } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
 
  54                 LOG.info("Read CM message from DMaaP topic that is moiDeletion type with id {}", cmNodeId);
 
  55                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
 
  56             } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
 
  57                 LOG.info("Read CM message from DMaaP topic that is moiAttributeValueChanges type with id {}", cmNodeId);
 
  58                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
 
  60                 LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
 
  61                 throw new InvalidMessageException();
 
  64         } catch (NullPointerException e) {
 
  65             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
 
  66             throw new InvalidMessageException("Missing field");
 
  70     private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
 
  71         return CMBasicHeaderFieldsNotification.builder()
 
  72             .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
 
  73             .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
 
  74             .withCMOccurrenceTime(Instant
 
  76                     rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
 
  77                 .atZone(ZoneId.of("Z")).toString())
 
  78             .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
 
  79             .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
 
  83     private void processMoiChanges(JsonNode rootNode) {
 
  84         Iterator<JsonNode> nodes = rootNode
 
  85             .at("/event/stndDefinedFields/data/moiChanges")
 
  87         while (nodes.hasNext()) {
 
  88             sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
 
  92     public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
 
  93         JsonNode slaidNode = nodes.next();
 
  94         return CMNotificationClient.createCMNotificationPayloadMap(
 
  95             CMNotification.builder()
 
  96                 .withCMBasicHeaderFieldsNotification(
 
  97                     prepareCMCommonHeaderFields(rootNode))
 
  98                 .withCMNotificationId(slaidNode.get("notificationId").toString())
 
  99                 .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
 
 100                 .withCMPath(slaidNode.get("path").textValue())
 
 101                 .withCMOperation(slaidNode.get("operation").textValue())
 
 102                 .withCMValue(slaidNode.get("value").toString()
 
 107     public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
 
 108         return CMNotificationClient.createCMNotificationPayloadMap(
 
 109             CMNotification.builder()
 
 110                 .withCMBasicHeaderFieldsNotification(
 
 111                     prepareCMCommonHeaderFields(rootNode))
 
 112                 .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
 
 113                 .withCMValue(rootNode.at(cmValueKey).toString()
 
 118     private void sendCMNotification(Map<String, String> payloadMapMessage) {
 
 119         CMNotificationClient cmClient = setRESTConfAuthorization();
 
 120         String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
 
 121         cmClient.sendNotification(message);
 
 125     private CMNotificationClient setRESTConfAuthorization() {
 
 126         String sdnrUser = getSDNRUser();
 
 127         String sdnrPasswd = getSDNRPasswd();
 
 129         CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
 
 130         LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
 
 131         cmClient.setAuthorization(sdnrUser, sdnrPasswd);