update websocketmanager
[ccsdk/features.git] / sdnr / wt / websocketmanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / websocketmanager / WebSocketManager.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager;
19
20 import java.io.IOException;
21 import java.net.URI;
22 import java.net.URISyntaxException;
23 import java.util.ArrayList;
24 import javax.servlet.ServletException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
28 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
29 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerSocket.EventInputCallback;
30 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig;
31 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterConfig;
32 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig.ClusterNodeInfo;
33 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.websocket.SyncWebSocketClient;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class WebSocketManager extends WebSocketServlet {
38
39     private static final long serialVersionUID = -681665669062744439L;
40
41     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
42     private static final String APPLICATION_NAME = WebSocketManager.class.getName();
43     private static final int PORT = 8181;
44     private final EventInputCallback rpcEventInputCallback;
45     private final AkkaConfig akkaConfig;
46     /**
47      * timeout for websocket with no messages in ms
48      */
49     //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
50     private static final long IDLE_TIMEOUT = 0L;
51
52     private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
53
54     public WebSocketManager() {
55         this(null, null);
56     }
57
58     public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
59         super();
60         this.akkaConfig = akkaconfig;
61         if (cb != null) {
62             this.rpcEventInputCallback = cb;
63         } else {
64             this.rpcEventInputCallback = message -> {
65                 LOG.debug("onMessagePushed: " + message);
66                 SyncWebSocketClient client;
67                 for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
68                     client = new SyncWebSocketClient(clientURI);
69                     LOG.debug("try to push message to " + client.getURI());
70                     client.openAndSendAndCloseSync(message);
71                 }
72             };
73         }
74         LOG.info("Create servlet for {}", APPLICATION_NAME);
75     }
76
77     @Override
78     public void configure(WebSocketServletFactory factory) {
79         LOG.info("Configure provider for {}", APPLICATION_NAME);
80         // set a second timeout
81         factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
82         factory.getPolicy().setMaxBinaryMessageSize(1);
83         factory.getPolicy().setMaxTextMessageSize(64 * 1024);
84
85         // register Socket as the WebSocket to create on Upgrade
86         factory.register(WebSocketManagerSocket.class);
87
88         AkkaConfig cfg = this.akkaConfig;
89         if (cfg == null) {
90             try {
91                 cfg = AkkaConfig.load();
92             } catch (Exception e) {
93                 LOG.warn("problem loading akka config: " + e.getMessage());
94             }
95         }
96         if (cfg != null && cfg.isCluster()) {
97             this.initWSClients(cfg.getClusterConfig());
98         }
99     }
100
101     /**********************************************************
102      * Private functions
103      */
104
105     @Override
106     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
107         if (req.getHeader("Upgrade") != null) {
108             /* Accept upgrade request */
109             resp.setStatus(101);
110             resp.setHeader("Upgrade", "XYZP");
111             resp.setHeader("Connection", "Upgrade");
112             resp.setHeader("OtherHeaderB", "Value");
113         }
114     }
115
116     private void initWSClients(ClusterConfig clusterConfig) {
117         for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
118             if (clusterConfig.isMe(nodeConfig)) {
119                 continue;
120             }
121             String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
122             try {
123                 LOG.debug("registering ws client for " + url);
124                 clusterNodeClients.add(new URI(url));
125             } catch (URISyntaxException e) {
126                 LOG.warn("problem instantiating wsclient for url: " + url);
127             }
128         }
129     }
130 }