Fix knock on tabs/SONAR/Checkstyle issues
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / notifications / NotificationServer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2019 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pdp.rest.notifications;
23
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.research.xacml.util.XACMLProperties;
27
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.net.URL;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.TimeUnit;
38
39 import javax.websocket.OnClose;
40 import javax.websocket.OnError;
41 import javax.websocket.OnMessage;
42 import javax.websocket.OnOpen;
43 import javax.websocket.Session;
44 import javax.websocket.server.ServerEndpoint;
45
46 import org.onap.policy.api.PolicyEngineException;
47 import org.onap.policy.common.logging.eelf.MessageCodes;
48 import org.onap.policy.common.logging.eelf.PolicyLogger;
49 import org.onap.policy.common.logging.flexlogger.FlexLogger;
50 import org.onap.policy.common.logging.flexlogger.Logger;
51 import org.onap.policy.pdp.rest.api.services.NotificationService;
52 import org.onap.policy.rest.XacmlRestProperties;
53 import org.onap.policy.utils.BusPublisher;
54 import org.onap.policy.xacml.api.XACMLErrorConstants;
55
56
57 /**
58  * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being
59  * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being
60  * used as a medium for sending Notifications.
61  *
62  * @version 0.2
63  *
64  **/
65 @ServerEndpoint(value = "/notifications")
66 public class NotificationServer {
67     private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
68     private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
69     private static String update = null;
70
71     @OnOpen
72     public void openConnection(Session session) {
73         LOGGER.info("Session Connected: " + session.getId());
74         queue.add(session);
75     }
76
77     @OnClose
78     public void closeConnection(Session session) {
79         queue.remove(session);
80     }
81
82     /**
83      * Error callback method.
84      * @param session The session on which the error occurs
85      * @param throwable exception thrown on the error callback
86      */
87     @OnError
88     public void error(Session session, Throwable throwable) {
89         queue.remove(session);
90         LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: "
91                 + throwable.getMessage());
92     }
93
94     /**
95      * Message callback method.
96      * @param message the message on the callback
97      * @param session The session on which the error occurs
98      */
99     @OnMessage
100     public void message(String message, Session session) {
101
102         if (message.equalsIgnoreCase("Manual")) {
103             try {
104                 session.getBasicRemote().sendText(update);
105                 session.close();
106             } catch (IOException e) {
107                 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
108                         + e.getMessage() + e);
109                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
110             }
111         }
112     }
113
114     /**
115      * Send a notification.
116      * @param notification The notification type
117      * @param propNotificationType Notification type properties
118      * @param pdpUrl URL of the PDP
119      * @throws PolicyEngineException on errors from the policy engine
120      * @throws IOException exceptions on IO errors
121      * @throws InterruptedException interrupts
122      */
123     public static void sendNotification(String notification, String propNotificationType, String pdpUrl)
124             throws PolicyEngineException, IOException, InterruptedException {
125
126         LOGGER.debug("Notification set to " + propNotificationType);
127         if (propNotificationType.equals("ueb")) {
128
129             String topic = null;
130             try {
131                 URL notificationUrl = new URL(pdpUrl);
132                 topic = notificationUrl.getHost() + notificationUrl.getPort();
133             } catch (MalformedURLException e1) {
134                 pdpUrl = pdpUrl.replace("/", "");
135                 topic = pdpUrl.replace(":", "");
136                 LOGGER.error(
137                         XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
138                 PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1,
139                         "Error in parsing out pdpURL for UEB notfication ");
140             }
141             String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
142             String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
143             String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
144
145             LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
146             CambriaBatchingPublisher pub = null;
147             try {
148                 if (hosts == null || topic == null || apiKey == null || apiSecret == null) {
149                     LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
150                             + "UEB properties are missing from the property file ");
151                     throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
152                             + "UEB properties are missing from the property file ");
153                 }
154
155                 hosts = hosts.trim();
156                 topic = topic.trim();
157                 apiKey = apiKey.trim();
158                 apiSecret = apiSecret.trim();
159                 pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic)
160                         .authenticatedBy(apiKey, apiSecret).build();
161
162             } catch (MalformedURLException e1) {
163                 LOGGER.error(
164                         XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
165             } catch (GeneralSecurityException e1) {
166                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher"
167                         + e1.getMessage() + e1);
168             }
169             if (pub != null) {
170                 try {
171                     pub.send("MyPartitionKey", notification);
172                 } catch (IOException e) {
173                     LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
174                             + e.getMessage() + e);
175                 }
176                 // close the publisher. The batching publisher does not send events
177                 // immediately, so you MUST use close to send any remaining messages.
178                 // You provide the amount of time you're willing to wait for the sends
179                 // to succeed before giving up. If any messages are unsent after that time,
180                 // they're returned to your app. You could, for example, persist to disk
181                 // and try again later.
182                 final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
183
184                 if (!stuck.isEmpty()) {
185                     LOGGER.error(stuck.size() + " messages unsent");
186                 } else {
187                     LOGGER.info("Clean exit; all messages sent: " + notification);
188                 }
189             }
190         } else if (propNotificationType.equals("dmaap")) {
191
192             // Setting up the Publisher for DMaaP MR
193             String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
194             String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
195             String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
196             String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
197
198             try {
199                 if (dmaapServers == null || topic == null) {
200                     LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
201                             + "DMaaP properties are missing from the property file ");
202                     throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
203                             + "DMaaP properties are missing from the property file ");
204                 }
205
206                 dmaapServers = dmaapServers.trim();
207                 topic = topic.trim();
208                 aafLogin = aafLogin.trim();
209                 aafPassword = aafPassword.trim();
210
211                 List<String> dmaapList = null;
212                 if (dmaapServers.contains(",")) {
213                     dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
214                 } else {
215                     dmaapList = new ArrayList<>();
216                     dmaapList.add(dmaapServers);
217                 }
218
219                 BusPublisher publisher =
220                         new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
221
222                 // Sending notification through DMaaP Message Router
223                 publisher.send("MyPartitionKey", notification);
224                 LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
225                 publisher.close();
226
227             } catch (Exception e) {
228                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
229                         + e.getMessage() + e);
230             }
231         }
232
233         for (Session session : queue) {
234             try {
235                 LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId()
236                         + "\n " + "PDPUrl is " + pdpUrl);
237                 LOGGER.info("NotificationServer: sending text message");
238                 session.getBasicRemote().sendText(notification);
239             } catch (IOException e) {
240                 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
241                         + e.getMessage() + e);
242             }
243         }
244
245         NotificationService.sendNotification(notification);
246     }
247
248     public static void setUpdate(String update) {
249         NotificationServer.update = update;
250     }
251
252 }