f508c61f71de83cb9823a70ad590b4d6bf355d02
[ccsdk/features.git] / sdnr / wt / websocketmanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / websocketmanager / WebSocketManagerSocket.java
1 /*
2 * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
19
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.security.SecureRandom;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Set;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34 import org.eclipse.jetty.websocket.api.Session;
35 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
36 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.DOMNotificationOutput;
37 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.INotificationOutput;
38 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
39 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
42 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
43 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
44 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 public class WebSocketManagerSocket extends WebSocketAdapter {
49
50     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
51     public static final String MSG_KEY_DATA = "data";
52     public static final DataType MSG_KEY_SCOPES = DataType.scopes;
53     public static final String MSG_KEY_PARAM = "param";
54     public static final String MSG_KEY_VALUE = "value";
55     public static final String MSG_KEY_SCOPE = "scope";
56
57     public static final String KEY_NODEID = "nodeId";
58     public static final String KEY_EVENTTYPE = "eventType";
59     private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
60     private static final Pattern PATTERN_SCOPEREGISTRATION =
61             Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
62     private static final SecureRandom RND = new SecureRandom();
63     private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
64     private static final int QUEUE_SIZE = 100;
65
66     private final Thread sendingSyncThread;
67     private final ArrayBlockingQueue<String> messageQueue;
68     private boolean closed;
69
70     private final Runnable sendingRunner = new Runnable() {
71         @Override
72         public void run() {
73             LOG.debug("isrunning");
74             while (!closed) {
75                 try {
76
77                     String message = messageQueue.poll();
78                     if (message != null) {
79                         WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
80                                 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
81                         LOG.debug("message sent");
82                     }
83                 } catch (ExecutionException | TimeoutException e) {
84                     LOG.warn("problem pushing message: ", e);
85                 } catch (InterruptedException e) {
86                     LOG.warn("Interrupted!", e);
87                     // Restore interrupted state...
88                     Thread.currentThread().interrupt();
89                 }
90
91                 if (messageQueue.isEmpty()) {
92                     trySleep(1000);
93                 }
94
95             }
96             LOG.debug("isstopped");
97
98         };
99     };
100
101     private static void trySleep(int sleepMs) {
102         try {
103             Thread.sleep(sleepMs);
104         } catch (InterruptedException e) {
105             Thread.currentThread().interrupt();
106         }
107     }
108
109     /**
110      * list of all sessionids
111      */
112     private static final List<String> sessionIds = new ArrayList<>();
113     /**
114      * map of sessionid <=> UserScopes
115      */
116     private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
117     /**
118      * map of class.hashCode <=> class
119      */
120     private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
121
122     private static final YangToolsMapper mapper = new YangToolsMapper();
123     private final String myUniqueSessionId;
124
125     private Session session = null;
126
127     public interface EventInputCallback {
128         void onMessagePushed(final String message) throws Exception;
129     }
130
131     public WebSocketManagerSocket() {
132         this.myUniqueSessionId = _genSessionId();
133         this.sendingSyncThread = new Thread(this.sendingRunner);
134         this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
135     }
136
137     @Override
138     protected void finalize() throws Throwable {
139         sessionIds.remove(this.myUniqueSessionId);
140     }
141
142     private static String _genSessionId() {
143         String sid = String.valueOf(RND.nextLong());
144         while (sessionIds.contains(sid)) {
145             sid = String.valueOf(RND.nextLong());
146         }
147         sessionIds.add(sid);
148         return sid;
149     }
150
151     @Override
152     public void onWebSocketText(String message) {
153         LOG.debug("{} has sent {}", this.getRemoteAdr(), message);
154         if (!this.manageClientRequest(message)) {
155             this.manageClientRequest2(message);
156         }
157     }
158
159     @Override
160     public void onWebSocketBinary(byte[] payload, int offset, int len) {
161         LOG.debug("Binary not supported");
162     }
163
164     @Override
165     public void onWebSocketConnect(Session sess) {
166         this.session = sess;
167         closed = false;
168         this.sendingSyncThread.start();
169         clientList.put(String.valueOf(this.hashCode()), this);
170         LOG.debug("client connected from {}", this.getRemoteAdr());
171     }
172
173     @Override
174     public void onWebSocketClose(int statusCode, String reason) {
175         clientList.remove(String.valueOf(this.hashCode()));
176         this.sendingSyncThread.interrupt();
177         closed = true;
178         LOG.debug("client disconnected from {}", this.getRemoteAdr());
179     }
180
181     @Override
182     public void onWebSocketError(Throwable cause) {
183         LOG.debug("error caused on {}: ",this.getRemoteAdr(), cause);
184     }
185
186     private String getRemoteAdr() {
187         String adr = "unknown";
188         try {
189             adr = this.session.getRemoteAddress().toString();
190         } catch (Exception e) {
191             LOG.debug("error resolving adr: {}", e.getMessage());
192         }
193         return adr;
194     }
195
196     /**
197      *
198      * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
199      * @return if handled
200      */
201     private boolean manageClientRequest(String request) {
202         boolean ret = false;
203         final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
204         if (!matcher.find()) {
205             return false;
206         }
207         try {
208             ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
209             if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
210                 ret = true;
211                 String sessionId = this.getSessionId();
212                 UserScopes clientDto = new UserScopes();
213                 clientDto.setScopes(registration.getScopes());
214                 userScopesList.put(sessionId, clientDto);
215                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
216             }
217
218         } catch (JsonProcessingException e) {
219             LOG.warn("problem set scope: {}" ,e.getMessage());
220             try {
221                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
222             } catch (JsonProcessingException e1) {
223                 LOG.warn("problem sending error response via ws: ", e1);
224             }
225         }
226         return ret;
227     }
228
229     /*
230      * broadcast message to all your clients
231      */
232     private void manageClientRequest2(String request) {
233         try {
234             NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
235             if (notification.getNodeId() != null && notification.getType() != null) {
236                 this.sendToAll(notification.getNodeId(), notification.getType(), request);
237             }
238         } catch (Exception e) {
239             LOG.warn("handle ws request failed:",e);
240         }
241     }
242
243     public void send(String msg) {
244         try {
245             LOG.trace("sending {}", msg);
246             this.messageQueue.put(msg);
247         } catch (InterruptedException e) {
248             LOG.warn("problem putting message into sending queue: {}", e.getMessage());
249             // Restore interrupted state...
250             Thread.currentThread().interrupt();
251         }
252     }
253
254     public String getSessionId() {
255         return this.myUniqueSessionId;
256     }
257
258     private void sendToAll(INotificationOutput output) {
259         try {
260             sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
261         } catch (JsonProcessingException e) {
262             LOG.warn("problem serializing noitifcation: ", e);
263         }
264     }
265
266     private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
267         if (clientList.size() > 0) {
268             for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
269                 WebSocketManagerSocket socket = entry.getValue();
270                 if (socket != null) {
271                     try {
272                         UserScopes clientScopes = userScopesList.get(socket.getSessionId());
273                         if (clientScopes != null) {
274                             if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
275                                 socket.send(notification);
276                             } else {
277                                 LOG.debug("client has not scope {}", reducedSchemaInfo);
278                             }
279                         } else {
280                             LOG.debug("no scopes for notifications registered");
281                         }
282                     } catch (Exception ioe) {
283                         LOG.warn(ioe.getMessage());
284                     }
285                 } else {
286                     LOG.debug("cannot broadcast. socket is null");
287                 }
288             }
289         }
290     }
291
292     public static void broadCast(INotificationOutput output) {
293         if (clientList.size() > 0) {
294             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
295             WebSocketManagerSocket s = e.iterator().next().getValue();
296             if (s != null) {
297                 s.sendToAll(output);
298             }
299         }
300     }
301
302     public static void broadCast(DOMNotificationOutput domNotificationOutput) {
303         if (clientList.size() > 0) {
304             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
305             WebSocketManagerSocket s = e.iterator().next().getValue();
306             if (s != null) {
307                 s.sendToAll(domNotificationOutput);
308             }
309         }
310     }
311
312 }