2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import com.fasterxml.jackson.databind.JsonNode;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 public class DMaaPCMVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
27 private static final Logger LOG = LoggerFactory.getLogger(DMaaPCMVESMsgConsumer.class);
29 public DMaaPCMVESMsgConsumer(GeneralConfig generalConfig) {
31 LOG.info("DMaaPCMVESMsgConsumer started successfully");
35 public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
36 LOG.debug("Processing CM message {}", msg);
37 JsonNode rootNode = convertMessageToJsonNode(msg);
39 JsonNode notificationNode;
41 dataNode = rootNode.get("event").get("stndDefinedFields").get("data").requireNonNull();
42 if(dataNode.get("notificationType").textValue().equalsIgnoreCase("notifyMOIChanges")) {
43 notificationNode = dataNode.get("moiChanges");
44 LOG.info("Read CM message from DMaaP topic that is moiChanges type with id {}", dataNode.get("notificationId"));
46 } catch (NullPointerException e) {
47 LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
48 throw new InvalidMessageException("Missing field");
50 // take required data from notificationNode