2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
22 import com.fasterxml.jackson.databind.JsonNode;
23 import com.fasterxml.jackson.databind.ObjectMapper;
24 import java.io.IOException;
25 import java.time.Instant;
26 import java.time.ZoneId;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
37 private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
39 public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) {
44 public void processMsg(String msg) throws Exception {
46 String faultOccurrenceTime;
52 String reportingEntityName;
53 ObjectMapper oMapper = new ObjectMapper();
54 JsonNode sKafkaMessageRootNode;
56 LOG.info("Fault VES Message is - {}", msg);
58 sKafkaMessageRootNode = oMapper.readTree(msg);
59 reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
60 if (reportingEntityName.equals("ONAP SDN-R")) {
62 "VES Fault message generated by SDNR, hence no need to process any further; Ignoring the received message");
66 vesDomain = sKafkaMessageRootNode.at("/event/commonEventHeader/domain").textValue();
67 if (!vesDomain.equalsIgnoreCase("fault")) {
68 LOG.warn("Received {} domain VES Message in Kafka Fault topic, ignoring it", vesDomain);
71 faultNodeId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
72 faultOccurrenceTime = Instant
74 sKafkaMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
75 .atZone(ZoneId.of("Z")).toString();
76 faultObjectId = sKafkaMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
77 faultReason = sKafkaMessageRootNode.at("/event/faultFields/specificProblem").textValue();
78 faultSeverity = sKafkaMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
79 faultSequence = sKafkaMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
81 if (faultSeverity.equalsIgnoreCase("critical")) {
82 faultSeverity = SeverityType.Critical.toString();
83 } else if (faultSeverity.equalsIgnoreCase("major")) {
84 faultSeverity = SeverityType.Major.toString();
85 } else if (faultSeverity.equalsIgnoreCase("minor")) {
86 faultSeverity = SeverityType.Minor.toString();
87 } else if (faultSeverity.equalsIgnoreCase("warning")) {
88 faultSeverity = SeverityType.Warning.toString();
89 } else if (faultSeverity.equalsIgnoreCase("nonalarmed")) {
90 faultSeverity = SeverityType.NonAlarmed.toString();
92 faultSeverity = SeverityType.NonAlarmed.toString();
95 String baseUrl = getBaseUrl();
96 String sdnrUser = getSDNRUser();
97 String sdnrPasswd = getSDNRPasswd();
99 Map<String, String> payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
100 Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
102 FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl);
103 LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
104 faultClient.setAuthorization(sdnrUser, sdnrPasswd);
105 String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage);
106 faultClient.sendNotification(message);
108 } catch (IOException e) {
109 LOG.info("Cannot parse json object ");
110 throw new Exception("Cannot parse json object", e);