8c962192b53a1d946975089ad3230492db422b78
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / notifications / NotificationServer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pdp.rest.notifications;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.websocket.OnClose;
35 import javax.websocket.OnError;
36 import javax.websocket.OnMessage;
37 import javax.websocket.OnOpen;
38 import javax.websocket.Session;
39 import javax.websocket.server.ServerEndpoint;
40
41 import org.onap.policy.api.PolicyEngineException;
42 import org.onap.policy.common.logging.eelf.MessageCodes;
43 import org.onap.policy.common.logging.eelf.PolicyLogger;
44 import org.onap.policy.common.logging.flexlogger.FlexLogger;
45 import org.onap.policy.common.logging.flexlogger.Logger;
46 import org.onap.policy.pdp.rest.api.services.NotificationService;
47 import org.onap.policy.rest.XacmlRestProperties;
48 import org.onap.policy.utils.BusPublisher;
49 import org.onap.policy.xacml.api.XACMLErrorConstants;
50
51 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
52 import com.att.nsa.cambria.client.CambriaClientBuilders;
53 import com.att.research.xacml.util.XACMLProperties;
54
55
56 /**
57  * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
58  * WebSockets is being used as a medium for sending Notifications.
59  * UEB is being used as a medium for sending Notifications.
60  * DMAAP is being used as a medium for sending Notifications.
61  *
62  * @version 0.2
63  *
64  **/
65 @ServerEndpoint(value = "/notifications")
66 public class NotificationServer {
67         private static final Logger LOGGER      = FlexLogger.getLogger(NotificationServer.class);
68         private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
69         private static String update = null;
70
71         @OnOpen
72         public void openConnection(Session session) {
73                 LOGGER.info("Session Connected: " + session.getId());
74                 queue.add(session);
75         }
76
77         @OnClose
78         public void closeConnection(Session session) {
79                 queue.remove(session);
80         }
81
82         @OnError
83         public void error(Session session, Throwable t) {
84                 queue.remove(session);
85                 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
86
87         }
88
89         @OnMessage
90         public void message(String message, Session session) {
91
92                 if(message.equalsIgnoreCase("Manual")) {
93                         try {
94                                 session.getBasicRemote().sendText(update);
95                                 session.close();
96                         } catch (IOException e) {
97                                 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
98                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
99                         }
100                 }
101         }
102
103         public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException {
104
105                 LOGGER.debug("Notification set to " + propNotificationType);
106                 if (propNotificationType.equals("ueb")){
107
108                         String topic = null;
109                         try {
110                                 URL aURL = new URL(pdpURL);
111                                 topic = aURL.getHost() + aURL.getPort();
112                         } catch (MalformedURLException e1) {
113                                 pdpURL = pdpURL.replace("/", "");
114                                 topic = pdpURL.replace(":", "");
115                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
116                                 PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
117                         }
118                         String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
119                         String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
120                         String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
121
122                         LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
123                         CambriaBatchingPublisher pub = null;
124                         try {
125                                 if(hosts==null || topic==null || apiKey==null || apiSecret==null){
126                                         LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
127                                         throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
128                                 }
129
130                                 hosts = hosts.trim();
131                                 topic = topic.trim();
132                                 apiKey = apiKey.trim();
133                                 apiSecret = apiSecret.trim();
134                                 pub = new CambriaClientBuilders.PublisherBuilder ()
135                                                 .usingHosts ( hosts )
136                                                 .onTopic ( topic )
137                                                 .authenticatedBy ( apiKey, apiSecret )
138                                                 .build ()
139                                                 ;
140
141                         } catch (MalformedURLException e1) {
142                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
143                         } catch (GeneralSecurityException e1) {
144                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
145                         }
146                         if(pub != null){
147                                 try {
148                                         pub.send( "MyPartitionKey", notification );
149                                 } catch (IOException e) {
150                                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
151                                 }
152                                 // close the publisher. The batching publisher does not send events
153                                 // immediately, so you MUST use close to send any remaining messages.
154                                 // You provide the amount of time you're willing to wait for the sends
155                                 // to succeed before giving up. If any messages are unsent after that time,
156                                 // they're returned to your app. You could, for example, persist to disk
157                                 // and try again later.
158                                 final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
159
160                                 if (!stuck.isEmpty()){
161                                         LOGGER.error( stuck.size() + " messages unsent" );
162                                 }else{
163                                         LOGGER.info( "Clean exit; all messages sent: " + notification );
164                                 }
165                         }
166                 } else if (propNotificationType.equals("dmaap")) {
167
168                         // Setting up the Publisher for DMaaP MR
169                         String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
170                         String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
171                         String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
172                         String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
173
174                         try {
175                                 if(dmaapServers==null || topic==null){
176                                         LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
177                                         throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
178                                 }
179
180                                 dmaapServers= dmaapServers.trim();
181                                 topic= topic.trim();
182                                 aafLogin= aafLogin.trim();
183                                 aafPassword= aafPassword.trim();
184
185                                 List<String> dmaapList = null;
186                                 if(dmaapServers.contains(",")) {
187                                         dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
188                                 } else {
189                                         dmaapList = new ArrayList<>();
190                                         dmaapList.add(dmaapServers);
191                                 }
192
193                                 BusPublisher publisher =
194                                                 new BusPublisher.DmaapPublisherWrapper(dmaapList,
195                                                                                                topic,
196                                                                                                aafLogin,
197                                                                                                aafPassword);
198
199                                 // Sending notification through DMaaP Message Router
200                                 publisher.send( "MyPartitionKey", notification);
201                                 LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
202                                 publisher.close();
203
204                         } catch (Exception e) {
205                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
206                         }
207                 }
208
209                 for(Session session: queue) {
210                         try {
211                                 LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n "
212                                                 + "PDPUrl is " + pdpURL);
213                         LOGGER.info("NotificationServer: sending text message");
214                                 session.getBasicRemote().sendText(notification);
215                         } catch (IOException e) {
216                                 LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
217                         }
218                 }
219
220                 NotificationService.sendNotification(notification);
221         }
222
223         public static void setUpdate(String update) {
224                 NotificationServer.update = update;
225         }
226
227 }