Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / vesdomain / fault / StrimziKafkaFaultVESMsgConsumer.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  */
19
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault;
21
22 import com.fasterxml.jackson.databind.JsonNode;
23 import com.fasterxml.jackson.databind.ObjectMapper;
24 import java.io.IOException;
25 import java.time.Instant;
26 import java.time.ZoneId;
27 import java.util.Map;
28 import org.apache.kafka.clients.admin.Admin;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class StrimziKafkaFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
36
37     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
38
39     public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
40         super(generalConfig, kafkaAdminClient);
41     }
42
43     @Override
44     public void processMsg(String msg) throws Exception {
45         String faultNodeId;
46         String faultOccurrenceTime;
47         String faultObjectId;
48         String faultReason;
49         String faultSeverity;
50         String vesDomain;
51         int faultSequence;
52         String reportingEntityName;
53         ObjectMapper oMapper = new ObjectMapper();
54         JsonNode sKafkaMessageRootNode;
55
56         LOG.info("Fault VES Message is - {}", msg);
57         try {
58                 sKafkaMessageRootNode = oMapper.readTree(msg);
59             reportingEntityName = sKafkaMessageRootNode.at("/event/commonEventHeader/reportingEntityName").textValue();
60             if (reportingEntityName.equals("ONAP SDN-R")) {
61                 LOG.info(
62                         "VES Fault message generated by SDNR, hence no need to process any further; Ignoring the received message");
63                 return;
64             }
65
66             vesDomain = sKafkaMessageRootNode.at("/event/commonEventHeader/domain").textValue();
67             if (!vesDomain.equalsIgnoreCase("fault")) {
68                 LOG.warn("Received {} domain VES Message in Kafka Fault topic, ignoring it", vesDomain);
69                 return;
70             }
71             faultNodeId = sKafkaMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
72             faultOccurrenceTime = Instant
73                     .ofEpochMilli(
74                             sKafkaMessageRootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
75                     .atZone(ZoneId.of("Z")).toString();
76             faultObjectId = sKafkaMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
77             faultReason = sKafkaMessageRootNode.at("/event/faultFields/specificProblem").textValue();
78             faultSeverity = sKafkaMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
79             faultSequence = sKafkaMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
80
81             if (faultSeverity.equalsIgnoreCase("critical")) {
82                 faultSeverity = SeverityType.Critical.toString();
83             } else if (faultSeverity.equalsIgnoreCase("major")) {
84                 faultSeverity = SeverityType.Major.toString();
85             } else if (faultSeverity.equalsIgnoreCase("minor")) {
86                 faultSeverity = SeverityType.Minor.toString();
87             } else if (faultSeverity.equalsIgnoreCase("warning")) {
88                 faultSeverity = SeverityType.Warning.toString();
89             } else if (faultSeverity.equalsIgnoreCase("nonalarmed")) {
90                 faultSeverity = SeverityType.NonAlarmed.toString();
91             } else {
92                 faultSeverity = SeverityType.NonAlarmed.toString();
93             }
94
95             String baseUrl = getBaseUrl();
96             String sdnrUser = getSDNRUser();
97             String sdnrPasswd = getSDNRPasswd();
98
99             Map<String, String> payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
100                     Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
101
102             FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl);
103             LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
104             faultClient.setAuthorization(sdnrUser, sdnrPasswd);
105             String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage);
106             faultClient.sendNotification(message);
107
108         } catch (IOException e) {
109             LOG.info("Cannot parse json object ");
110             throw new Exception("Cannot parse json object", e);
111         }
112     }
113
114 }