Improve Websocket notification interface
[ccsdk/features.git] / sdnr / wt / websocketmanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / websocketmanager / WebSocketManagerSocket.java
1 /*
2 * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
19
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Random;
27 import java.util.Set;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34 import org.eclipse.jetty.websocket.api.Session;
35 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
36 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
37 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
38 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
39 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
42 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class WebSocketManagerSocket extends WebSocketAdapter {
47
48     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
49     public static final String MSG_KEY_DATA = "data";
50     public static final DataType MSG_KEY_SCOPES = DataType.scopes;
51     public static final String MSG_KEY_PARAM = "param";
52     public static final String MSG_KEY_VALUE = "value";
53     public static final String MSG_KEY_SCOPE = "scope";
54
55     public static final String KEY_NODEID = "nodeId";
56     public static final String KEY_EVENTTYPE = "eventType";
57     private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
58     private static final Pattern PATTERN_SCOPEREGISTRATION =
59             Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
60     private static final Random RND = new Random();
61     private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
62     private static final int QUEUE_SIZE = 100;
63
64     private final Thread sendingSyncThread;
65     private final ArrayBlockingQueue<String> messageQueue;
66     private boolean closed;
67
68     private final Runnable sendingRunner = new Runnable() {
69         @Override
70         public void run() {
71             LOG.debug("isrunning");
72             while (!closed) {
73                 try {
74
75                     String message = messageQueue.poll();
76                     if (message != null) {
77                         WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
78                                 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
79                         LOG.info("message sent");
80                     }
81                 } catch (InterruptedException | ExecutionException | TimeoutException e) {
82                     LOG.warn("problem pushing message: ", e);
83                 }
84
85                 if (messageQueue.isEmpty()) {
86                     trySleep(1000);
87                 }
88
89             }
90             LOG.debug("isstopped");
91
92         };
93     };
94
95     private static void trySleep(int sleepMs) {
96         try {
97             Thread.sleep(sleepMs);
98         } catch (InterruptedException e) {
99             Thread.currentThread().interrupt();
100         }
101     }
102
103     /**
104      * list of all sessionids
105      */
106     private static final List<String> sessionIds = new ArrayList<>();
107     /**
108      * map of sessionid <=> UserScopes
109      */
110     private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
111     /**
112      * map of class.hashCode <=> class
113      */
114     private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
115
116     private static final YangToolsMapper mapper = new YangToolsMapper();
117     private final String myUniqueSessionId;
118
119     private Session session = null;
120
121     public interface EventInputCallback {
122         void onMessagePushed(final String message) throws Exception;
123     }
124
125     public WebSocketManagerSocket() {
126         this.myUniqueSessionId = _genSessionId();
127         this.sendingSyncThread = new Thread(this.sendingRunner);
128         this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
129     }
130
131     @Override
132     protected void finalize() throws Throwable {
133         sessionIds.remove(this.myUniqueSessionId);
134     }
135
136     private static String _genSessionId() {
137         String sid = String.valueOf(RND.nextLong());
138         while (sessionIds.contains(sid)) {
139             sid = String.valueOf(RND.nextLong());
140         }
141         sessionIds.add(sid);
142         return sid;
143     }
144
145     @Override
146     public void onWebSocketText(String message) {
147         LOG.info("{} has sent {}", this.getRemoteAdr(), message);
148         if (!this.manageClientRequest(message)) {
149             this.manageClientRequest2(message);
150         }
151     }
152
153     @Override
154     public void onWebSocketBinary(byte[] payload, int offset, int len) {
155         LOG.debug("Binary not supported");
156     }
157
158     @Override
159     public void onWebSocketConnect(Session sess) {
160         this.session = sess;
161         closed = false;
162         this.sendingSyncThread.start();
163         clientList.put(String.valueOf(this.hashCode()), this);
164         LOG.debug("client connected from " + this.getRemoteAdr());
165     }
166
167     @Override
168     public void onWebSocketClose(int statusCode, String reason) {
169         clientList.remove(String.valueOf(this.hashCode()));
170         this.sendingSyncThread.interrupt();
171         closed = true;
172         LOG.debug("client disconnected from " + this.getRemoteAdr());
173     }
174
175     @Override
176     public void onWebSocketError(Throwable cause) {
177         LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
178     }
179
180     private String getRemoteAdr() {
181         String adr = "unknown";
182         try {
183             adr = this.session.getRemoteAddress().toString();
184         } catch (Exception e) {
185             LOG.debug("error resolving adr: {}", e.getMessage());
186         }
187         return adr;
188     }
189
190     /**
191      *
192      * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
193      * @return if handled
194      */
195     private boolean manageClientRequest(String request) {
196         boolean ret = false;
197         final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
198         if (!matcher.find()) {
199             return false;
200         }
201         try {
202             ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
203             if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
204                 ret = true;
205                 String sessionId = this.getSessionId();
206                 UserScopes clientDto = new UserScopes();
207                 clientDto.setScopes(registration.getScopes());
208                 userScopesList.put(sessionId, clientDto);
209                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
210             }
211
212         } catch (JsonProcessingException e) {
213             LOG.warn("problem set scope: " + e.getMessage());
214             try {
215                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
216             } catch (JsonProcessingException e1) {
217                 LOG.warn("problem sending error response via ws: " + e1);
218             }
219         }
220         return ret;
221     }
222
223     /*
224      * broadcast message to all your clients
225      */
226     private void manageClientRequest2(String request) {
227         try {
228             NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
229             if (notification.getNodeId() != null && notification.getType() != null) {
230                 this.sendToAll(notification.getNodeId(), notification.getType(), request);
231             }
232         } catch (Exception e) {
233             LOG.warn("handle ws request failed:" + e.getMessage());
234         }
235     }
236
237     public void send(String msg) {
238         try {
239             LOG.trace("sending {}", msg);
240             this.messageQueue.put(msg);
241         } catch (InterruptedException e) {
242             LOG.warn("problem putting message into sending queue: " + e.getMessage());
243         }
244     }
245
246     public String getSessionId() {
247         return this.myUniqueSessionId;
248     }
249
250     private void sendToAll(NotificationOutput output) {
251         try {
252             sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
253         } catch (JsonProcessingException e) {
254             LOG.warn("problem serializing noitifcation: ", e);
255         }
256     }
257
258     private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
259         if (clientList.size() > 0) {
260             for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
261                 WebSocketManagerSocket socket = entry.getValue();
262                 if (socket != null) {
263                     try {
264                         UserScopes clientScopes = userScopesList.get(socket.getSessionId());
265                         if (clientScopes != null) {
266                             if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
267                                 socket.send(notification);
268                             } else {
269                                 LOG.debug("client has not scope {}", reducedSchemaInfo);
270                             }
271                         } else {
272                             LOG.debug("no scopes for notifications registered");
273                         }
274                     } catch (Exception ioe) {
275                         LOG.warn(ioe.getMessage());
276                     }
277                 } else {
278                     LOG.debug("cannot broadcast. socket is null");
279                 }
280             }
281         }
282     }
283
284     public static void broadCast(NotificationOutput output) {
285         if (clientList.size() > 0) {
286             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
287             WebSocketManagerSocket s = e.iterator().next().getValue();
288             if (s != null) {
289                 s.sendToAll(output);
290             }
291         }
292     }
293
294 }