2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2019 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.pdp.rest.notifications;
24 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.research.xacml.util.XACMLProperties;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.TimeUnit;
39 import javax.websocket.OnClose;
40 import javax.websocket.OnError;
41 import javax.websocket.OnMessage;
42 import javax.websocket.OnOpen;
43 import javax.websocket.Session;
44 import javax.websocket.server.ServerEndpoint;
46 import org.onap.policy.api.PolicyEngineException;
47 import org.onap.policy.common.logging.eelf.MessageCodes;
48 import org.onap.policy.common.logging.eelf.PolicyLogger;
49 import org.onap.policy.common.logging.flexlogger.FlexLogger;
50 import org.onap.policy.common.logging.flexlogger.Logger;
51 import org.onap.policy.pdp.rest.api.services.NotificationService;
52 import org.onap.policy.rest.XacmlRestProperties;
53 import org.onap.policy.utils.BusPublisher;
54 import org.onap.policy.xacml.api.XACMLErrorConstants;
58 * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being
59 * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being
60 * used as a medium for sending Notifications.
65 @ServerEndpoint(value = "/notifications")
66 public class NotificationServer {
67 private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
68 private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
69 private static String update = null;
72 public void openConnection(Session session) {
73 LOGGER.info("Session Connected: " + session.getId());
78 public void closeConnection(Session session) {
79 queue.remove(session);
83 * Error callback method.
84 * @param session The session on which the error occurs
85 * @param throwable exception thrown on the error callback
88 public void error(Session session, Throwable throwable) {
89 queue.remove(session);
90 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: "
91 + throwable.getMessage());
95 * Message callback method.
96 * @param message the message on the callback
97 * @param session The session on which the error occurs
100 public void message(String message, Session session) {
102 if (message.equalsIgnoreCase("Manual")) {
104 session.getBasicRemote().sendText(update);
106 } catch (IOException e) {
107 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
108 + e.getMessage() + e);
109 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
115 * Send a notification.
116 * @param notification The notification type
117 * @param propNotificationType Notification type properties
118 * @param pdpUrl URL of the PDP
119 * @throws PolicyEngineException on errors from the policy engine
120 * @throws IOException exceptions on IO errors
121 * @throws InterruptedException interrupts
123 public static void sendNotification(String notification, String propNotificationType, String pdpUrl)
124 throws PolicyEngineException, IOException, InterruptedException {
126 LOGGER.debug("Notification set to " + propNotificationType);
127 if (propNotificationType.equals("ueb")) {
131 URL notificationUrl = new URL(pdpUrl);
132 topic = notificationUrl.getHost() + notificationUrl.getPort();
133 } catch (MalformedURLException e1) {
134 pdpUrl = pdpUrl.replace("/", "");
135 topic = pdpUrl.replace(":", "");
137 XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
138 PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1,
139 "Error in parsing out pdpURL for UEB notfication ");
141 String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
142 String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
143 String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
145 LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
146 CambriaBatchingPublisher pub = null;
148 if (hosts == null || topic == null || apiKey == null || apiSecret == null) {
149 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
150 + "UEB properties are missing from the property file ");
151 throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
152 + "UEB properties are missing from the property file ");
155 hosts = hosts.trim();
156 topic = topic.trim();
157 apiKey = apiKey.trim();
158 apiSecret = apiSecret.trim();
159 pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic)
160 .authenticatedBy(apiKey, apiSecret).build();
162 } catch (MalformedURLException e1) {
164 XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
165 } catch (GeneralSecurityException e1) {
166 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher"
167 + e1.getMessage() + e1);
171 pub.send("MyPartitionKey", notification);
172 } catch (IOException e) {
173 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
174 + e.getMessage() + e);
176 // close the publisher. The batching publisher does not send events
177 // immediately, so you MUST use close to send any remaining messages.
178 // You provide the amount of time you're willing to wait for the sends
179 // to succeed before giving up. If any messages are unsent after that time,
180 // they're returned to your app. You could, for example, persist to disk
181 // and try again later.
182 final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
184 if (!stuck.isEmpty()) {
185 LOGGER.error(stuck.size() + " messages unsent");
187 LOGGER.info("Clean exit; all messages sent: " + notification);
190 } else if (propNotificationType.equals("dmaap")) {
192 // Setting up the Publisher for DMaaP MR
193 String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
194 String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
195 String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
196 String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
199 if (dmaapServers == null || topic == null) {
200 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
201 + "DMaaP properties are missing from the property file ");
202 throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
203 + "DMaaP properties are missing from the property file ");
206 dmaapServers = dmaapServers.trim();
207 topic = topic.trim();
208 aafLogin = aafLogin.trim();
209 aafPassword = aafPassword.trim();
211 List<String> dmaapList = null;
212 if (dmaapServers.contains(",")) {
213 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
215 dmaapList = new ArrayList<>();
216 dmaapList.add(dmaapServers);
219 BusPublisher publisher =
220 new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
222 // Sending notification through DMaaP Message Router
223 publisher.send("MyPartitionKey", notification);
224 LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
227 } catch (Exception e) {
228 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
229 + e.getMessage() + e);
233 for (Session session : queue) {
235 LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId()
236 + "\n " + "PDPUrl is " + pdpUrl);
237 LOGGER.info("NotificationServer: sending text message");
238 session.getBasicRemote().sendText(notification);
239 } catch (IOException e) {
240 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
241 + e.getMessage() + e);
245 NotificationService.sendNotification(notification);
248 public static void setUpdate(String update) {
249 NotificationServer.update = update;