95715be39b0d238e09f985b62f236a1f41426494
[ccsdk/features.git] / sdnr / wt / websocketmanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / websocketmanager / WebSocketManagerSocket.java
1 /*
2 * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
19
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.security.SecureRandom;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import org.eclipse.jetty.websocket.api.Session;
36 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
37 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.DOMNotificationOutput;
38 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.INotificationOutput;
39 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
42 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
43 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
44 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
45 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class WebSocketManagerSocket extends WebSocketAdapter {
50
51     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
52     public static final String MSG_KEY_DATA = "data";
53     public static final DataType MSG_KEY_SCOPES = DataType.scopes;
54     public static final String MSG_KEY_PARAM = "param";
55     public static final String MSG_KEY_VALUE = "value";
56     public static final String MSG_KEY_SCOPE = "scope";
57
58     public static final String KEY_NODEID = "nodeId";
59     public static final String KEY_EVENTTYPE = "eventType";
60     private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
61     private static final Pattern PATTERN_SCOPEREGISTRATION =
62             Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
63     private static final SecureRandom RND = new SecureRandom();
64     private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
65     private static final int QUEUE_SIZE = 100;
66
67     private final Thread sendingSyncThread;
68     private final ArrayBlockingQueue<String> messageQueue;
69     private boolean closed;
70
71     private final Runnable sendingRunner = new Runnable() {
72         @Override
73         public void run() {
74             LOG.debug("isrunning");
75             while (!closed) {
76                 try {
77
78                     String message = messageQueue.poll();
79                     if (message != null) {
80                         WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
81                                 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
82                         LOG.info("message sent");
83                     }
84                 } catch (ExecutionException | TimeoutException e) {
85                     LOG.warn("problem pushing message: ", e);
86                 } catch (InterruptedException e) {
87                     LOG.warn("Interrupted!", e);
88                     // Restore interrupted state...
89                     Thread.currentThread().interrupt();
90                 }
91
92                 if (messageQueue.isEmpty()) {
93                     trySleep(1000);
94                 }
95
96             }
97             LOG.debug("isstopped");
98
99         };
100     };
101
102     private static void trySleep(int sleepMs) {
103         try {
104             Thread.sleep(sleepMs);
105         } catch (InterruptedException e) {
106             Thread.currentThread().interrupt();
107         }
108     }
109
110     /**
111      * list of all sessionids
112      */
113     private static final List<String> sessionIds = new ArrayList<>();
114     /**
115      * map of sessionid <=> UserScopes
116      */
117     private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
118     /**
119      * map of class.hashCode <=> class
120      */
121     private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
122
123     private static final YangToolsMapper mapper = new YangToolsMapper();
124     private final String myUniqueSessionId;
125
126     private Session session = null;
127
128     public interface EventInputCallback {
129         void onMessagePushed(final String message) throws Exception;
130     }
131
132     public WebSocketManagerSocket() {
133         this.myUniqueSessionId = _genSessionId();
134         this.sendingSyncThread = new Thread(this.sendingRunner);
135         this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
136     }
137
138     @Override
139     protected void finalize() throws Throwable {
140         sessionIds.remove(this.myUniqueSessionId);
141     }
142
143     private static String _genSessionId() {
144         String sid = String.valueOf(RND.nextLong());
145         while (sessionIds.contains(sid)) {
146             sid = String.valueOf(RND.nextLong());
147         }
148         sessionIds.add(sid);
149         return sid;
150     }
151
152     @Override
153     public void onWebSocketText(String message) {
154         LOG.info("{} has sent {}", this.getRemoteAdr(), message);
155         if (!this.manageClientRequest(message)) {
156             this.manageClientRequest2(message);
157         }
158     }
159
160     @Override
161     public void onWebSocketBinary(byte[] payload, int offset, int len) {
162         LOG.debug("Binary not supported");
163     }
164
165     @Override
166     public void onWebSocketConnect(Session sess) {
167         this.session = sess;
168         closed = false;
169         this.sendingSyncThread.start();
170         clientList.put(String.valueOf(this.hashCode()), this);
171         LOG.debug("client connected from {}", this.getRemoteAdr());
172     }
173
174     @Override
175     public void onWebSocketClose(int statusCode, String reason) {
176         clientList.remove(String.valueOf(this.hashCode()));
177         this.sendingSyncThread.interrupt();
178         closed = true;
179         LOG.debug("client disconnected from {}", this.getRemoteAdr());
180     }
181
182     @Override
183     public void onWebSocketError(Throwable cause) {
184         LOG.debug("error caused on {}: ",this.getRemoteAdr(), cause);
185     }
186
187     private String getRemoteAdr() {
188         String adr = "unknown";
189         try {
190             adr = this.session.getRemoteAddress().toString();
191         } catch (Exception e) {
192             LOG.debug("error resolving adr: {}", e.getMessage());
193         }
194         return adr;
195     }
196
197     /**
198      *
199      * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
200      * @return if handled
201      */
202     private boolean manageClientRequest(String request) {
203         boolean ret = false;
204         final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
205         if (!matcher.find()) {
206             return false;
207         }
208         try {
209             ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
210             if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
211                 ret = true;
212                 String sessionId = this.getSessionId();
213                 UserScopes clientDto = new UserScopes();
214                 clientDto.setScopes(registration.getScopes());
215                 userScopesList.put(sessionId, clientDto);
216                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
217             }
218
219         } catch (JsonProcessingException e) {
220             LOG.warn("problem set scope: {}" ,e.getMessage());
221             try {
222                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
223             } catch (JsonProcessingException e1) {
224                 LOG.warn("problem sending error response via ws: ", e1);
225             }
226         }
227         return ret;
228     }
229
230     /*
231      * broadcast message to all your clients
232      */
233     private void manageClientRequest2(String request) {
234         try {
235             NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
236             if (notification.getNodeId() != null && notification.getType() != null) {
237                 this.sendToAll(notification.getNodeId(), notification.getType(), request);
238             }
239         } catch (Exception e) {
240             LOG.warn("handle ws request failed:",e);
241         }
242     }
243
244     public void send(String msg) {
245         try {
246             LOG.trace("sending {}", msg);
247             this.messageQueue.put(msg);
248         } catch (InterruptedException e) {
249             LOG.warn("problem putting message into sending queue: {}", e.getMessage());
250             // Restore interrupted state...
251             Thread.currentThread().interrupt();
252         }
253     }
254
255     public String getSessionId() {
256         return this.myUniqueSessionId;
257     }
258
259     private void sendToAll(INotificationOutput output) {
260         try {
261             sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
262         } catch (JsonProcessingException e) {
263             LOG.warn("problem serializing noitifcation: ", e);
264         }
265     }
266
267     private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
268         if (clientList.size() > 0) {
269             for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
270                 WebSocketManagerSocket socket = entry.getValue();
271                 if (socket != null) {
272                     try {
273                         UserScopes clientScopes = userScopesList.get(socket.getSessionId());
274                         if (clientScopes != null) {
275                             if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
276                                 socket.send(notification);
277                             } else {
278                                 LOG.debug("client has not scope {}", reducedSchemaInfo);
279                             }
280                         } else {
281                             LOG.debug("no scopes for notifications registered");
282                         }
283                     } catch (Exception ioe) {
284                         LOG.warn(ioe.getMessage());
285                     }
286                 } else {
287                     LOG.debug("cannot broadcast. socket is null");
288                 }
289             }
290         }
291     }
292
293     public static void broadCast(INotificationOutput output) {
294         if (clientList.size() > 0) {
295             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
296             WebSocketManagerSocket s = e.iterator().next().getValue();
297             if (s != null) {
298                 s.sendToAll(output);
299             }
300         }
301     }
302
303     public static void broadCast(DOMNotificationOutput domNotificationOutput) {
304         if (clientList.size() > 0) {
305             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
306             WebSocketManagerSocket s = e.iterator().next().getValue();
307             if (s != null) {
308                 s.sendToAll(domNotificationOutput);
309             }
310         }
311     }
312
313 }