add fixes for wt sulfur
[ccsdk/features.git] / sdnr / wt / websocketmanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / websocketmanager / WebSocketManagerSocket.java
1 /*
2 * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
19
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.security.SecureRandom;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35
36 import com.fasterxml.jackson.databind.ObjectMapper;
37 import com.fasterxml.jackson.databind.SerializationFeature;
38 import org.eclipse.jetty.websocket.api.Session;
39 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.DOMNotificationOutput;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.INotificationOutput;
42 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
43 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
44 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
45 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
46 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
47 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
48 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public class WebSocketManagerSocket extends WebSocketAdapter {
53
54     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
55     public static final String MSG_KEY_DATA = "data";
56     public static final DataType MSG_KEY_SCOPES = DataType.scopes;
57     public static final String MSG_KEY_PARAM = "param";
58     public static final String MSG_KEY_VALUE = "value";
59     public static final String MSG_KEY_SCOPE = "scope";
60
61     public static final String KEY_NODEID = "nodeId";
62     public static final String KEY_EVENTTYPE = "eventType";
63     private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
64     private static final Pattern PATTERN_SCOPEREGISTRATION =
65             Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
66     private static final SecureRandom RND = new SecureRandom();
67     private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
68     private static final int QUEUE_SIZE = 100;
69
70     private final Thread sendingSyncThread;
71     private final ArrayBlockingQueue<String> messageQueue;
72     private boolean closed;
73
74     private final Runnable sendingRunner = new Runnable() {
75         @Override
76         public void run() {
77             LOG.debug("isrunning");
78             while (!closed) {
79                 try {
80
81                     String message = messageQueue.poll();
82                     if (message != null) {
83                         WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
84                                 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
85                         LOG.info("message sent");
86                     }
87                 } catch (ExecutionException | TimeoutException e) {
88                     LOG.warn("problem pushing message: ", e);
89                 } catch (InterruptedException e) {
90                     LOG.warn("Interrupted!", e);
91                     // Restore interrupted state...
92                     Thread.currentThread().interrupt();
93                 }
94
95                 if (messageQueue.isEmpty()) {
96                     trySleep(1000);
97                 }
98
99             }
100             LOG.debug("isstopped");
101
102         };
103     };
104
105     private static void trySleep(int sleepMs) {
106         try {
107             Thread.sleep(sleepMs);
108         } catch (InterruptedException e) {
109             Thread.currentThread().interrupt();
110         }
111     }
112
113     /**
114      * list of all sessionids
115      */
116     private static final List<String> sessionIds = new ArrayList<>();
117     /**
118      * map of sessionid <=> UserScopes
119      */
120     private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
121     /**
122      * map of class.hashCode <=> class
123      */
124     private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
125
126     private static final ObjectMapper mapper = new YangToolsMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
127     private final String myUniqueSessionId;
128
129     private Session session = null;
130
131     public interface EventInputCallback {
132         void onMessagePushed(final String message) throws Exception;
133     }
134
135     public WebSocketManagerSocket() {
136         this.myUniqueSessionId = _genSessionId();
137         this.sendingSyncThread = new Thread(this.sendingRunner);
138         this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
139
140     }
141
142     @Override
143     protected void finalize() throws Throwable {
144         sessionIds.remove(this.myUniqueSessionId);
145     }
146
147     private static String _genSessionId() {
148         String sid = String.valueOf(RND.nextLong());
149         while (sessionIds.contains(sid)) {
150             sid = String.valueOf(RND.nextLong());
151         }
152         sessionIds.add(sid);
153         return sid;
154     }
155
156     @Override
157     public void onWebSocketText(String message) {
158         LOG.info("{} has sent {}", this.getRemoteAdr(), message);
159         if (!this.manageClientRequest(message)) {
160             this.manageClientRequest2(message);
161         }
162     }
163
164     @Override
165     public void onWebSocketBinary(byte[] payload, int offset, int len) {
166         LOG.debug("Binary not supported");
167     }
168
169     @Override
170     public void onWebSocketConnect(Session sess) {
171         this.session = sess;
172         closed = false;
173         this.sendingSyncThread.start();
174         clientList.put(String.valueOf(this.hashCode()), this);
175         LOG.debug("client connected from {}", this.getRemoteAdr());
176     }
177
178     @Override
179     public void onWebSocketClose(int statusCode, String reason) {
180         clientList.remove(String.valueOf(this.hashCode()));
181         this.sendingSyncThread.interrupt();
182         closed = true;
183         LOG.debug("client disconnected from {}", this.getRemoteAdr());
184     }
185
186     @Override
187     public void onWebSocketError(Throwable cause) {
188         LOG.debug("error caused on {}: ",this.getRemoteAdr(), cause);
189     }
190
191     private String getRemoteAdr() {
192         String adr = "unknown";
193         try {
194             adr = this.session.getRemoteAddress().toString();
195         } catch (Exception e) {
196             LOG.debug("error resolving adr: {}", e.getMessage());
197         }
198         return adr;
199     }
200
201     /**
202      *
203      * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
204      * @return if handled
205      */
206     private boolean manageClientRequest(String request) {
207         boolean ret = false;
208         final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
209         if (!matcher.find()) {
210             return false;
211         }
212         try {
213             ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
214             if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
215                 ret = true;
216                 String sessionId = this.getSessionId();
217                 UserScopes clientDto = new UserScopes();
218                 clientDto.setScopes(registration.getScopes());
219                 userScopesList.put(sessionId, clientDto);
220                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
221             }
222
223         } catch (JsonProcessingException e) {
224             LOG.warn("problem set scope: {}" ,e.getMessage());
225             try {
226                 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
227             } catch (JsonProcessingException e1) {
228                 LOG.warn("problem sending error response via ws: ", e1);
229             }
230         }
231         return ret;
232     }
233
234     /*
235      * broadcast message to all your clients
236      */
237     private void manageClientRequest2(String request) {
238         try {
239             NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
240             if (notification.getNodeId() != null && notification.getType() != null) {
241                 this.sendToAll(notification.getNodeId(), notification.getType(), request);
242             }
243         } catch (Exception e) {
244             LOG.warn("handle ws request failed:",e);
245         }
246     }
247
248     public void send(String msg) {
249         try {
250             LOG.trace("sending {}", msg);
251             this.messageQueue.put(msg);
252         } catch (InterruptedException e) {
253             LOG.warn("problem putting message into sending queue: {}", e.getMessage());
254             // Restore interrupted state...
255             Thread.currentThread().interrupt();
256         }
257     }
258
259     public String getSessionId() {
260         return this.myUniqueSessionId;
261     }
262
263     private void sendToAll(INotificationOutput output) {
264         try {
265             sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
266         } catch (JsonProcessingException e) {
267             LOG.warn("problem serializing noitifcation: ", e);
268         }
269     }
270
271     private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
272         if (clientList.size() > 0) {
273             for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
274                 WebSocketManagerSocket socket = entry.getValue();
275                 if (socket != null) {
276                     try {
277                         UserScopes clientScopes = userScopesList.get(socket.getSessionId());
278                         if (clientScopes != null) {
279                             if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
280                                 socket.send(notification);
281                             } else {
282                                 LOG.debug("client has not scope {}", reducedSchemaInfo);
283                             }
284                         } else {
285                             LOG.debug("no scopes for notifications registered");
286                         }
287                     } catch (Exception ioe) {
288                         LOG.warn(ioe.getMessage());
289                     }
290                 } else {
291                     LOG.debug("cannot broadcast. socket is null");
292                 }
293             }
294         }
295     }
296
297     public static void broadCast(INotificationOutput output) {
298         if (clientList.size() > 0) {
299             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
300             WebSocketManagerSocket s = e.iterator().next().getValue();
301             if (s != null) {
302                 s.sendToAll(output);
303             }
304         }
305     }
306
307     public static void broadCast(DOMNotificationOutput domNotificationOutput) {
308         if (clientList.size() > 0) {
309             Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
310             WebSocketManagerSocket s = e.iterator().next().getValue();
311             if (s != null) {
312                 s.sendToAll(domNotificationOutput);
313             }
314         }
315     }
316
317 }