Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / vesdomain / stnddefined / StrimziKafkaStndDefinedFaultVESMsgConsumer.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2022 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined;
20
21 import com.fasterxml.jackson.core.JsonProcessingException;
22 import com.fasterxml.jackson.databind.JsonNode;
23 import java.time.Instant;
24 import java.time.ZoneId;
25 import java.util.Map;
26 import org.apache.kafka.clients.admin.Admin;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.FaultNotificationClient;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class StrimziKafkaStndDefinedFaultVESMsgConsumer extends StrimziKafkaVESMsgConsumerImpl {
36
37     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaStndDefinedFaultVESMsgConsumer.class);
38     Map<String, String> payloadMapMessage = null;
39     String faultNodeId;
40     String notificationType;
41
42     public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
43         super(generalConfig, kafkaAdminClient);
44         LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully");
45     }
46
47     /*
48      * Supports processing of notifyNewAlarm and notifyClearedAlarm messages ONLY
49      */
50     @Override
51     public void processMsg(String msg) throws InvalidMessageException, JsonProcessingException {
52         LOG.debug("Processing StndDefined Fault message {}", msg);
53         JsonNode rootNode = convertMessageToJsonNode(msg);
54         try {
55
56             faultNodeId = rootNode.at("/event/commonEventHeader/sourceName").textValue();
57             notificationType = rootNode.at("/event/stndDefinedFields/data/notificationType").textValue();
58
59             if (notificationType.equalsIgnoreCase("notifyNewAlarm")) {
60                 LOG.info("Read stndDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
61                         faultNodeId);
62                 processNewAlarm(rootNode);
63             } else if (notificationType.equalsIgnoreCase("notifyClearedAlarm")) {
64                 LOG.info("Read stdnDefined Fault message of type - {} with id {} from Kafka topic", notificationType,
65                         faultNodeId);
66                 processClearedAlarm(rootNode);
67             } else {
68                 LOG.warn(
69                         "Read stdnDefined Fault message of type - {} with id {} from Kafka topic. No suitable implementation for processing this message",
70                         notificationType, faultNodeId);
71                 throw new InvalidMessageException();
72             }
73             // Send Fault Notification
74             String baseUrl = getBaseUrl();
75             String sdnrUser = getSDNRUser();
76             String sdnrPasswd = getSDNRPasswd();
77
78             FaultNotificationClient faultClient = new FaultNotificationClient(baseUrl);
79             LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
80             faultClient.setAuthorization(sdnrUser, sdnrPasswd);
81             String message = faultClient.prepareMessageFromPayloadMap(payloadMapMessage);
82             faultClient.sendNotification(message);
83         } catch (NullPointerException e) {
84             LOG.warn("Message is invalid, sending aborted, processing stopped because one of fields is missing");
85             throw new InvalidMessageException("Missing field");
86         }
87
88     }
89
90     private void processClearedAlarm(JsonNode rootNode) {
91         String faultOccurrenceTime =
92                 Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
93                         .atZone(ZoneId.of("Z")).toString();
94         int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
95         String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
96         String faultReason = rootNode.at("/event/stndDefinedFields/data/probableCause").textValue();
97         String faultSeverity = SeverityType.NonAlarmed.toString();
98
99         payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
100                 Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
101
102     }
103
104     private void processNewAlarm(JsonNode rootNode) {
105         String faultOccurrenceTime =
106                 Instant.ofEpochMilli(rootNode.at("/event/commonEventHeader/startEpochMicrosec").longValue() / 1000)
107                         .atZone(ZoneId.of("Z")).toString();
108         int faultSequence = rootNode.at("/event/commonEventHeader/sequence").intValue();
109         String faultObjectId = rootNode.at("/event/stndDefinedFields/data/alarmId").textValue();
110         String faultReason = rootNode.at("/event/stndDefinedFields/data/probableCause").textValue();
111         String faultSeverity =
112                 getSDNRSeverityType(rootNode.at("/event/stndDefinedFields/data/perceivedSeverity").textValue());
113
114         payloadMapMessage = FaultNotificationClient.createFaultNotificationPayloadMap(faultNodeId,
115                 Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
116
117     }
118
119     /*
120      * 3GPP Definition PerceivedSeverity: type: string enum: - INDETERMINATE -
121      * CRITICAL - MAJOR - MINOR - WARNING - CLEARED
122      *
123      */
124     private String getSDNRSeverityType(String faultSeverity) {
125         if (faultSeverity.equalsIgnoreCase("critical")) {
126             faultSeverity = SeverityType.Critical.toString();
127         } else if (faultSeverity.equalsIgnoreCase("major")) {
128             faultSeverity = SeverityType.Major.toString();
129         } else if (faultSeverity.equalsIgnoreCase("minor")) {
130             faultSeverity = SeverityType.Minor.toString();
131         } else if (faultSeverity.equalsIgnoreCase("warning") || faultSeverity.equalsIgnoreCase("indeterminate")) {
132             faultSeverity = SeverityType.Warning.toString();
133         } else if (faultSeverity.equalsIgnoreCase("cleared")) {
134             faultSeverity = SeverityType.NonAlarmed.toString();
135         } else {
136             faultSeverity = SeverityType.NonAlarmed.toString();
137         }
138         return faultSeverity;
139     }
140
141 }