2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pdp.rest.notifications;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.TimeUnit;
34 import javax.websocket.OnClose;
35 import javax.websocket.OnError;
36 import javax.websocket.OnMessage;
37 import javax.websocket.OnOpen;
38 import javax.websocket.Session;
39 import javax.websocket.server.ServerEndpoint;
41 import org.onap.policy.api.PolicyEngineException;
42 import org.onap.policy.common.logging.eelf.MessageCodes;
43 import org.onap.policy.common.logging.eelf.PolicyLogger;
44 import org.onap.policy.common.logging.flexlogger.FlexLogger;
45 import org.onap.policy.common.logging.flexlogger.Logger;
46 import org.onap.policy.pdp.rest.api.services.NotificationService;
47 import org.onap.policy.rest.XACMLRestProperties;
48 import org.onap.policy.utils.BusPublisher;
49 import org.onap.policy.xacml.api.XACMLErrorConstants;
51 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
52 import com.att.nsa.cambria.client.CambriaClientBuilders;
53 import com.att.research.xacml.util.XACMLProperties;
57 * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
58 * WebSockets is being used as a medium for sending Notifications.
59 * UEB is being used as a medium for sending Notifications.
60 * DMAAP is being used as a medium for sending Notifications.
65 @ServerEndpoint(value = "/notifications")
66 public class NotificationServer {
67 private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
68 private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
69 private static String update = null;
72 public void openConnection(Session session) {
73 LOGGER.info("Session Connected: " + session.getId());
78 public void closeConnection(Session session) {
79 queue.remove(session);
83 public void error(Session session, Throwable t) {
84 queue.remove(session);
85 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
90 public void message(String message, Session session) {
92 if(message.equalsIgnoreCase("Manual")) {
94 session.getBasicRemote().sendText(update);
96 } catch (IOException e) {
97 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
98 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
103 public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException {
105 LOGGER.debug("Notification set to " + propNotificationType);
106 if (propNotificationType.equals("ueb")){
110 URL aURL = new URL(pdpURL);
111 topic = aURL.getHost() + aURL.getPort();
112 } catch (MalformedURLException e1) {
113 pdpURL = pdpURL.replace("/", "");
114 topic = pdpURL.replace(":", "");
115 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
116 PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
118 String hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
119 String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
120 String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
122 LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
123 CambriaBatchingPublisher pub = null;
125 if(hosts==null || topic==null || apiKey==null || apiSecret==null){
126 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
127 throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
130 hosts = hosts.trim();
131 topic = topic.trim();
132 apiKey = apiKey.trim();
133 apiSecret = apiSecret.trim();
134 pub = new CambriaClientBuilders.PublisherBuilder ()
135 .usingHosts ( hosts )
137 .authenticatedBy ( apiKey, apiSecret )
141 } catch (MalformedURLException e1) {
142 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
143 } catch (GeneralSecurityException e1) {
144 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
148 pub.send( "MyPartitionKey", notification );
149 } catch (IOException e) {
150 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
152 // close the publisher. The batching publisher does not send events
153 // immediately, so you MUST use close to send any remaining messages.
154 // You provide the amount of time you're willing to wait for the sends
155 // to succeed before giving up. If any messages are unsent after that time,
156 // they're returned to your app. You could, for example, persist to disk
157 // and try again later.
158 final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
160 if (!stuck.isEmpty()){
161 LOGGER.error( stuck.size() + " messages unsent" );
163 LOGGER.info( "Clean exit; all messages sent: " + notification );
166 } else if (propNotificationType.equals("dmaap")) {
168 // Setting up the Publisher for DMaaP MR
169 String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
170 String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
171 String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
172 String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
175 if(dmaapServers==null || topic==null){
176 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
177 throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
180 dmaapServers= dmaapServers.trim();
182 aafLogin= aafLogin.trim();
183 aafPassword= aafPassword.trim();
185 List<String> dmaapList = null;
186 if(dmaapServers.contains(",")) {
187 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
189 dmaapList = new ArrayList<>();
190 dmaapList.add(dmaapServers);
193 BusPublisher publisher =
194 new BusPublisher.DmaapPublisherWrapper(dmaapList,
199 // Sending notification through DMaaP Message Router
200 publisher.send( "MyPartitionKey", notification);
201 LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
204 } catch (Exception e) {
205 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
209 for(Session session: queue) {
211 session.getBasicRemote().sendText(notification);
212 } catch (IOException e) {
213 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
216 NotificationService.sendNotification(notification);
219 public static void setUpdate(String update) {
220 NotificationServer.update = update;