2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
20 import java.io.IOException;
22 import java.net.URISyntaxException;
23 import java.util.ArrayList;
24 import javax.servlet.ServletException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
28 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
29 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerSocket.EventInputCallback;
30 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig;
31 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterConfig;
32 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterNodeInfo;
33 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket.SyncWebSocketClient;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class WebSocketManager extends WebSocketServlet {
39 private static final long serialVersionUID = -681665669062744439L;
41 private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
42 private static final String APPLICATION_NAME = WebSocketManager.class.getName();
43 private static final int PORT = 8181;
44 private final EventInputCallback rpcEventInputCallback;
45 private final AkkaConfig akkaConfig;
47 * timeout for websocket with no messages in ms
49 //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
50 private static final long IDLE_TIMEOUT = 0L;
52 private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
54 public WebSocketManager() {
58 public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
60 this.akkaConfig = akkaconfig;
62 this.rpcEventInputCallback = cb;
64 this.rpcEventInputCallback = message -> {
65 LOG.debug("onMessagePushed: " + message);
66 SyncWebSocketClient client;
67 for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
68 client = new SyncWebSocketClient(clientURI);
69 LOG.debug("try to push message to " + client.getURI());
70 client.openAndSendAndCloseSync(message);
74 LOG.info("Create servlet for {}", APPLICATION_NAME);
78 public void configure(WebSocketServletFactory factory) {
79 LOG.info("Configure provider for {}", APPLICATION_NAME);
80 // set a second timeout
81 factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
82 factory.getPolicy().setMaxBinaryMessageSize(1);
83 factory.getPolicy().setMaxTextMessageSize(64 * 1024);
85 // register Socket as the WebSocket to create on Upgrade
86 factory.register(WebSocketManagerSocket.class);
88 AkkaConfig cfg = this.akkaConfig;
91 cfg = AkkaConfig.load();
92 } catch (Exception e) {
93 LOG.warn("problem loading akka config: " + e.getMessage());
96 if (cfg != null && cfg.isCluster()) {
97 this.initWSClients(cfg.getClusterConfig());
101 /**********************************************************
106 protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
107 if (req.getHeader("Upgrade") != null) {
108 /* Accept upgrade request */
110 resp.setHeader("Upgrade", "XYZP");
111 resp.setHeader("Connection", "Upgrade");
112 resp.setHeader("OtherHeaderB", "Value");
116 private void initWSClients(ClusterConfig clusterConfig) {
117 for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
118 if (clusterConfig.isMe(nodeConfig)) {
121 String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
123 LOG.debug("registering ws client for " + url);
124 clusterNodeClients.add(new URI(url));
125 } catch (URISyntaxException e) {
126 LOG.warn("problem instantiating wsclient for url: " + url);