2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.security.SecureRandom;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.Map.Entry;
27 import java.util.Random;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
36 import com.fasterxml.jackson.databind.ObjectMapper;
37 import com.fasterxml.jackson.databind.SerializationFeature;
38 import org.eclipse.jetty.websocket.api.Session;
39 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.DOMNotificationOutput;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.INotificationOutput;
42 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
43 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
44 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
45 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
46 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
47 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
48 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 public class WebSocketManagerSocket extends WebSocketAdapter {
54 private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
55 public static final String MSG_KEY_DATA = "data";
56 public static final DataType MSG_KEY_SCOPES = DataType.scopes;
57 public static final String MSG_KEY_PARAM = "param";
58 public static final String MSG_KEY_VALUE = "value";
59 public static final String MSG_KEY_SCOPE = "scope";
61 public static final String KEY_NODEID = "nodeId";
62 public static final String KEY_EVENTTYPE = "eventType";
63 private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
64 private static final Pattern PATTERN_SCOPEREGISTRATION =
65 Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
66 private static final SecureRandom RND = new SecureRandom();
67 private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
68 private static final int QUEUE_SIZE = 100;
70 private final Thread sendingSyncThread;
71 private final ArrayBlockingQueue<String> messageQueue;
72 private boolean closed;
74 private final Runnable sendingRunner = new Runnable() {
77 LOG.debug("isrunning");
81 String message = messageQueue.poll();
82 if (message != null) {
83 WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
84 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
85 LOG.info("message sent");
87 } catch (ExecutionException | TimeoutException e) {
88 LOG.warn("problem pushing message: ", e);
89 } catch (InterruptedException e) {
90 LOG.warn("Interrupted!", e);
91 // Restore interrupted state...
92 Thread.currentThread().interrupt();
95 if (messageQueue.isEmpty()) {
100 LOG.debug("isstopped");
105 private static void trySleep(int sleepMs) {
107 Thread.sleep(sleepMs);
108 } catch (InterruptedException e) {
109 Thread.currentThread().interrupt();
114 * list of all sessionids
116 private static final List<String> sessionIds = new ArrayList<>();
118 * map of sessionid <=> UserScopes
120 private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
122 * map of class.hashCode <=> class
124 private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
126 private static final ObjectMapper mapper = new YangToolsMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
127 private final String myUniqueSessionId;
129 private Session session = null;
131 public interface EventInputCallback {
132 void onMessagePushed(final String message) throws Exception;
135 public WebSocketManagerSocket() {
136 this.myUniqueSessionId = _genSessionId();
137 this.sendingSyncThread = new Thread(this.sendingRunner);
138 this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
143 protected void finalize() throws Throwable {
144 sessionIds.remove(this.myUniqueSessionId);
147 private static String _genSessionId() {
148 String sid = String.valueOf(RND.nextLong());
149 while (sessionIds.contains(sid)) {
150 sid = String.valueOf(RND.nextLong());
157 public void onWebSocketText(String message) {
158 LOG.info("{} has sent {}", this.getRemoteAdr(), message);
159 if (!this.manageClientRequest(message)) {
160 this.manageClientRequest2(message);
165 public void onWebSocketBinary(byte[] payload, int offset, int len) {
166 LOG.debug("Binary not supported");
170 public void onWebSocketConnect(Session sess) {
173 this.sendingSyncThread.start();
174 clientList.put(String.valueOf(this.hashCode()), this);
175 LOG.debug("client connected from {}", this.getRemoteAdr());
179 public void onWebSocketClose(int statusCode, String reason) {
180 clientList.remove(String.valueOf(this.hashCode()));
181 this.sendingSyncThread.interrupt();
183 LOG.debug("client disconnected from {}", this.getRemoteAdr());
187 public void onWebSocketError(Throwable cause) {
188 LOG.debug("error caused on {}: ",this.getRemoteAdr(), cause);
191 private String getRemoteAdr() {
192 String adr = "unknown";
194 adr = this.session.getRemoteAddress().toString();
195 } catch (Exception e) {
196 LOG.debug("error resolving adr: {}", e.getMessage());
203 * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
206 private boolean manageClientRequest(String request) {
208 final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
209 if (!matcher.find()) {
213 ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
214 if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
216 String sessionId = this.getSessionId();
217 UserScopes clientDto = new UserScopes();
218 clientDto.setScopes(registration.getScopes());
219 userScopesList.put(sessionId, clientDto);
220 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
223 } catch (JsonProcessingException e) {
224 LOG.warn("problem set scope: {}" ,e.getMessage());
226 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
227 } catch (JsonProcessingException e1) {
228 LOG.warn("problem sending error response via ws: ", e1);
235 * broadcast message to all your clients
237 private void manageClientRequest2(String request) {
239 NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
240 if (notification.getNodeId() != null && notification.getType() != null) {
241 this.sendToAll(notification.getNodeId(), notification.getType(), request);
243 } catch (Exception e) {
244 LOG.warn("handle ws request failed:",e);
248 public void send(String msg) {
250 LOG.trace("sending {}", msg);
251 this.messageQueue.put(msg);
252 } catch (InterruptedException e) {
253 LOG.warn("problem putting message into sending queue: {}", e.getMessage());
254 // Restore interrupted state...
255 Thread.currentThread().interrupt();
259 public String getSessionId() {
260 return this.myUniqueSessionId;
263 private void sendToAll(INotificationOutput output) {
265 sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
266 } catch (JsonProcessingException e) {
267 LOG.warn("problem serializing noitifcation: ", e);
271 private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
272 if (clientList.size() > 0) {
273 for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
274 WebSocketManagerSocket socket = entry.getValue();
275 if (socket != null) {
277 UserScopes clientScopes = userScopesList.get(socket.getSessionId());
278 if (clientScopes != null) {
279 if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
280 socket.send(notification);
282 LOG.debug("client has not scope {}", reducedSchemaInfo);
285 LOG.debug("no scopes for notifications registered");
287 } catch (Exception ioe) {
288 LOG.warn(ioe.getMessage());
291 LOG.debug("cannot broadcast. socket is null");
297 public static void broadCast(INotificationOutput output) {
298 if (clientList.size() > 0) {
299 Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
300 WebSocketManagerSocket s = e.iterator().next().getValue();
307 public static void broadCast(DOMNotificationOutput domNotificationOutput) {
308 if (clientList.size() > 0) {
309 Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
310 WebSocketManagerSocket s = e.iterator().next().getValue();
312 s.sendToAll(domNotificationOutput);