2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm;
 
  20 import com.fasterxml.jackson.core.JsonProcessingException;
 
  21 import com.fasterxml.jackson.databind.JsonNode;
 
  22 import java.time.Instant;
 
  23 import java.time.ZoneId;
 
  24 import java.util.Iterator;
 
  27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 
  28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
 
  29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
 
  30 import org.slf4j.Logger;
 
  31 import org.slf4j.LoggerFactory;
 
  33 public class StrimziKafkaCMVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
 
  35     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
 
  37     public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
 
  39         LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
 
  43     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
 
  44         LOG.debug("Processing CM message {}", msg);
 
  45         JsonNode rootNode = convertMessageToJsonNode(msg);
 
  48             String cmNodeId = rootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
 
  49             String notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
 
  51             if (notificationType.equalsIgnoreCase("notifyMOIChanges")) {
 
  52                 LOG.info("Read CM message from Kafka topic that is moiChanges type with id {}", cmNodeId);
 
  53                 processMoiChanges(rootNode);
 
  54             } else if (notificationType.equalsIgnoreCase("notifyMOICreation")) {
 
  55                 LOG.info("Read CM message from Kafka topic that is moiCreation type with id {}", cmNodeId);
 
  56                 sendCMNotification(preparePayloadMapFromMoi(rootNode, "/event/stndDefinedFields/data/attributeList"));
 
  57             } else if (notificationType.equalsIgnoreCase("notifyMOIDeletion")) {
 
  58                 LOG.info("Read CM message from Kafka topic that is moiDeletion type with id {}", cmNodeId);
 
  59                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeList"));
 
  60             } else if (notificationType.equalsIgnoreCase("notifyMOIAttributeValueChanges")) {
 
  61                 LOG.info("Read CM message from Kafka topic that is moiAttributeValueChanges type with id {}", cmNodeId);
 
  62                 sendCMNotification(preparePayloadMapFromMoi(rootNode,"/event/stndDefinedFields/data/attributeListValueChanges"));
 
  64                 LOG.warn("Message is invalid, sending aborted, wrong CM notification type {}", notificationType);
 
  65                 throw new InvalidMessageException();
 
  68         } catch (NullPointerException e) {
 
  69             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
 
  70             throw new InvalidMessageException("Missing field");
 
  74     private CMBasicHeaderFieldsNotification prepareCMCommonHeaderFields(JsonNode rootNode) {
 
  75         return CMBasicHeaderFieldsNotification.builder()
 
  76             .withCMNodeId(rootNode.at("/event/commonEventHeader/reportingEntityName").textValue())
 
  77             .withCMSequence(rootNode.at("/event/commonEventHeader/sequence").toString())
 
  78             .withCMOccurrenceTime(Instant
 
  80                     rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
 
  81                 .atZone(ZoneId.of("Z")).toString())
 
  82             .withSourceId(rootNode.at("/event/commonEventHeader/sourceId").textValue())
 
  83             .withNotificationType(rootNode.at("/event/stndDefinedFields/data/notificationType").textValue())
 
  87     private void processMoiChanges(JsonNode rootNode) {
 
  88         Iterator<JsonNode> nodes = rootNode
 
  89             .at("/event/stndDefinedFields/data/moiChanges")
 
  91         while (nodes.hasNext()) {
 
  92             sendCMNotification(preparePayloadMapFromMoiChangesArray(rootNode, nodes));
 
  96     public Map<String, String> preparePayloadMapFromMoiChangesArray(JsonNode rootNode, Iterator<JsonNode> nodes) {
 
  97         JsonNode slaidNode = nodes.next();
 
  98         return CMNotificationClient.createCMNotificationPayloadMap(
 
  99             CMNotification.builder()
 
 100                 .withCMBasicHeaderFieldsNotification(
 
 101                     prepareCMCommonHeaderFields(rootNode))
 
 102                 .withCMNotificationId(slaidNode.get("notificationId").toString())
 
 103                 .withCMSourceIndicator(slaidNode.get("sourceIndicator").textValue())
 
 104                 .withCMPath(slaidNode.get("path").textValue())
 
 105                 .withCMOperation(slaidNode.get("operation").textValue())
 
 106                 .withCMValue(slaidNode.get("value").toString()
 
 111     public Map<String, String> preparePayloadMapFromMoi(JsonNode rootNode, String cmValueKey){
 
 112         return CMNotificationClient.createCMNotificationPayloadMap(
 
 113             CMNotification.builder()
 
 114                 .withCMBasicHeaderFieldsNotification(
 
 115                     prepareCMCommonHeaderFields(rootNode))
 
 116                 .withCMSourceIndicator(rootNode.at("/event/stndDefinedFields/data/sourceIndicator").textValue())
 
 117                 .withCMValue(rootNode.at(cmValueKey).toString()
 
 122     private void sendCMNotification(Map<String, String> payloadMapMessage) {
 
 123         CMNotificationClient cmClient = setRESTConfAuthorization();
 
 124         String message = cmClient.prepareMessageFromPayloadMap(payloadMapMessage);
 
 125         cmClient.sendNotification(message);
 
 129     private CMNotificationClient setRESTConfAuthorization() {
 
 130         String sdnrUser = getSDNRUser();
 
 131         String sdnrPasswd = getSDNRPasswd();
 
 133         CMNotificationClient cmClient = new CMNotificationClient(getBaseUrl());
 
 134         LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
 
 135         cmClient.setAuthorization(sdnrUser, sdnrPasswd);