2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
20 import com.fasterxml.jackson.core.JsonProcessingException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
25 import java.util.Map.Entry;
26 import java.util.Random;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34 import org.eclipse.jetty.websocket.api.Session;
35 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
36 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.DOMNotificationOutput;
37 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.INotificationOutput;
38 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationOutput;
39 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
40 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration;
41 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistration.DataType;
42 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ScopeRegistrationResponse;
43 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
44 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapper;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 public class WebSocketManagerSocket extends WebSocketAdapter {
50 private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
51 public static final String MSG_KEY_DATA = "data";
52 public static final DataType MSG_KEY_SCOPES = DataType.scopes;
53 public static final String MSG_KEY_PARAM = "param";
54 public static final String MSG_KEY_VALUE = "value";
55 public static final String MSG_KEY_SCOPE = "scope";
57 public static final String KEY_NODEID = "nodeId";
58 public static final String KEY_EVENTTYPE = "eventType";
59 private static final String REGEX_SCOPEREGISTRATION = "\"data\"[\\s]*:[\\s]*\"scopes\"";
60 private static final Pattern PATTERN_SCOPEREGISTRATION =
61 Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
62 private static final Random RND = new Random();
63 private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
64 private static final int QUEUE_SIZE = 100;
66 private final Thread sendingSyncThread;
67 private final ArrayBlockingQueue<String> messageQueue;
68 private boolean closed;
70 private final Runnable sendingRunner = new Runnable() {
73 LOG.debug("isrunning");
77 String message = messageQueue.poll();
78 if (message != null) {
79 WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
80 .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
81 LOG.info("message sent");
83 } catch (InterruptedException | ExecutionException | TimeoutException e) {
84 LOG.warn("problem pushing message: ", e);
87 if (messageQueue.isEmpty()) {
92 LOG.debug("isstopped");
97 private static void trySleep(int sleepMs) {
99 Thread.sleep(sleepMs);
100 } catch (InterruptedException e) {
101 Thread.currentThread().interrupt();
106 * list of all sessionids
108 private static final List<String> sessionIds = new ArrayList<>();
110 * map of sessionid <=> UserScopes
112 private static final HashMap<String, UserScopes> userScopesList = new HashMap<>();
114 * map of class.hashCode <=> class
116 private static final HashMap<String, WebSocketManagerSocket> clientList = new HashMap<>();
118 private static final YangToolsMapper mapper = new YangToolsMapper();
119 private final String myUniqueSessionId;
121 private Session session = null;
123 public interface EventInputCallback {
124 void onMessagePushed(final String message) throws Exception;
127 public WebSocketManagerSocket() {
128 this.myUniqueSessionId = _genSessionId();
129 this.sendingSyncThread = new Thread(this.sendingRunner);
130 this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
134 protected void finalize() throws Throwable {
135 sessionIds.remove(this.myUniqueSessionId);
138 private static String _genSessionId() {
139 String sid = String.valueOf(RND.nextLong());
140 while (sessionIds.contains(sid)) {
141 sid = String.valueOf(RND.nextLong());
148 public void onWebSocketText(String message) {
149 LOG.info("{} has sent {}", this.getRemoteAdr(), message);
150 if (!this.manageClientRequest(message)) {
151 this.manageClientRequest2(message);
156 public void onWebSocketBinary(byte[] payload, int offset, int len) {
157 LOG.debug("Binary not supported");
161 public void onWebSocketConnect(Session sess) {
164 this.sendingSyncThread.start();
165 clientList.put(String.valueOf(this.hashCode()), this);
166 LOG.debug("client connected from " + this.getRemoteAdr());
170 public void onWebSocketClose(int statusCode, String reason) {
171 clientList.remove(String.valueOf(this.hashCode()));
172 this.sendingSyncThread.interrupt();
174 LOG.debug("client disconnected from " + this.getRemoteAdr());
178 public void onWebSocketError(Throwable cause) {
179 LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
182 private String getRemoteAdr() {
183 String adr = "unknown";
185 adr = this.session.getRemoteAddress().toString();
186 } catch (Exception e) {
187 LOG.debug("error resolving adr: {}", e.getMessage());
194 * @param request is a json object {"data":"scopes","scopes":["scope1","scope2",...]}
197 private boolean manageClientRequest(String request) {
199 final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
200 if (!matcher.find()) {
204 ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
205 if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
207 String sessionId = this.getSessionId();
208 UserScopes clientDto = new UserScopes();
209 clientDto.setScopes(registration.getScopes());
210 userScopesList.put(sessionId, clientDto);
211 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.success(registration.getScopes())));
214 } catch (JsonProcessingException e) {
215 LOG.warn("problem set scope: " + e.getMessage());
217 this.send(mapper.writeValueAsString(ScopeRegistrationResponse.error(e.getMessage())));
218 } catch (JsonProcessingException e1) {
219 LOG.warn("problem sending error response via ws: " + e1);
226 * broadcast message to all your clients
228 private void manageClientRequest2(String request) {
230 NotificationOutput notification = mapper.readValue(request, NotificationOutput.class);
231 if (notification.getNodeId() != null && notification.getType() != null) {
232 this.sendToAll(notification.getNodeId(), notification.getType(), request);
234 } catch (Exception e) {
235 LOG.warn("handle ws request failed:" + e.getMessage());
239 public void send(String msg) {
241 LOG.trace("sending {}", msg);
242 this.messageQueue.put(msg);
243 } catch (InterruptedException e) {
244 LOG.warn("problem putting message into sending queue: " + e.getMessage());
248 public String getSessionId() {
249 return this.myUniqueSessionId;
252 private void sendToAll(INotificationOutput output) {
254 sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
255 } catch (JsonProcessingException e) {
256 LOG.warn("problem serializing noitifcation: ", e);
260 private void sendToAll(String nodeId, ReducedSchemaInfo reducedSchemaInfo, String notification) {
261 if (clientList.size() > 0) {
262 for (Map.Entry<String, WebSocketManagerSocket> entry : clientList.entrySet()) {
263 WebSocketManagerSocket socket = entry.getValue();
264 if (socket != null) {
266 UserScopes clientScopes = userScopesList.get(socket.getSessionId());
267 if (clientScopes != null) {
268 if (clientScopes.hasScope(nodeId, reducedSchemaInfo)) {
269 socket.send(notification);
271 LOG.debug("client has not scope {}", reducedSchemaInfo);
274 LOG.debug("no scopes for notifications registered");
276 } catch (Exception ioe) {
277 LOG.warn(ioe.getMessage());
280 LOG.debug("cannot broadcast. socket is null");
286 public static void broadCast(INotificationOutput output) {
287 if (clientList.size() > 0) {
288 Set<Entry<String, WebSocketManagerSocket>> e = clientList.entrySet();
289 WebSocketManagerSocket s = e.iterator().next().getValue();
296 public static void broadCast(DOMNotificationOutput domNotificationOutput) {
297 // TODO Auto-generated method stub