1c4d924cbae5d59a85d9d7f5dfbd75fa271fbf96
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17   */
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
19
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import javax.servlet.ServletException;
26 import javax.servlet.http.HttpServletRequest;
27 import javax.servlet.http.HttpServletResponse;
28 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
29 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
30 import org.json.JSONObject;
31 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.WebSocketManagerSocket.EventInputCallback;
32 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterConfig;
34 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterNodeInfo;
35 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket.SyncWebSocketClient;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutputBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
40 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class WebSocketManager extends WebSocketServlet implements WebsocketmanagerService {
47
48     private static final long serialVersionUID = -681665669062744439L;
49
50     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
51     private static final String APPLICATION_NAME = WebSocketManager.class.getName();
52     private static final int PORT = 8181;
53     private final EventInputCallback rpcEventInputCallback;
54     private final AkkaConfig akkaConfig;
55     /**
56      * timeout for websocket with no messages in ms
57       */
58     //private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
59     private static final long IDLE_TIMEOUT = 0L;
60
61     private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
62
63     public WebSocketManager() {
64         this(null, null);
65     }
66
67     public WebSocketManager(AkkaConfig akkaconfig, EventInputCallback cb) {
68         super();
69         this.akkaConfig = akkaconfig;
70         if (cb != null) {
71             this.rpcEventInputCallback = cb;
72         } else {
73             this.rpcEventInputCallback = message -> {
74                 LOG.debug("onMessagePushed: " + message);
75                 SyncWebSocketClient client;
76                 for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
77                     client = new SyncWebSocketClient(clientURI);
78                     LOG.debug("try to push message to " + client.getURI());
79                     client.openAndSendAndCloseSync(message);
80                 }
81             };
82         }
83         LOG.info("Create servlet for {}", APPLICATION_NAME);
84     }
85
86     @Override
87     public void configure(WebSocketServletFactory factory) {
88         LOG.info("Configure provider for {}", APPLICATION_NAME);
89         // set a second timeout
90         factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
91         factory.getPolicy().setMaxBinaryMessageSize(1);
92         factory.getPolicy().setMaxTextMessageSize(64 * 1024);
93
94         // register Socket as the WebSocket to create on Upgrade
95         factory.register(WebSocketManagerSocket.class);
96
97         AkkaConfig cfg = this.akkaConfig;
98         if (cfg == null) {
99             try {
100                 cfg = AkkaConfig.load();
101             } catch (Exception e) {
102                 LOG.warn("problem loading akka config: " + e.getMessage());
103             }
104         }
105         if (cfg != null && cfg.isCluster()) {
106             this.initWSClients(cfg.getClusterConfig());
107         }
108     }
109
110     // ODL in Dublin version generates ListenableFuture that is child of Future.
111     @Override
112     public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) {
113         LOG.debug("Send message '{}'", input);
114         RpcResultBuilder<WebsocketEventOutput> result;
115
116         final String eventAsXmlString = input.getXmlEvent();
117         if (eventAsXmlString != null) {
118             WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), eventAsXmlString);
119             try {
120                 JSONObject o = new JSONObject();
121                 o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName());
122                 o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType());
123                 o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent());
124                 this.rpcEventInputCallback.onMessagePushed(o.toString());
125
126                 WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
127                 outputBuilder.setResponse("OK");
128                 result = RpcResultBuilder.success(outputBuilder);
129             } catch (Exception err) {
130                 LOG.warn("problem pushing messsage to other nodes: " + err.getMessage());
131                 result = RpcResultBuilder.failed();
132                 result.withError(ErrorType.APPLICATION, "Exception", err);
133             }
134         } else {
135             String msg = "Emtpy event received";
136             LOG.warn(msg);
137             result = RpcResultBuilder.failed();
138             result.withError(ErrorType.APPLICATION, msg);
139         }
140         return result.buildFuture();
141     }
142
143     /**********************************************************
144      * Private functions
145       */
146
147     @Override
148     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
149         if (req.getHeader("Upgrade") != null) {
150             /* Accept upgrade request  */
151             resp.setStatus(101);
152             resp.setHeader("Upgrade", "XYZP");
153             resp.setHeader("Connection", "Upgrade");
154             resp.setHeader("OtherHeaderB", "Value");
155         }
156     }
157
158     private void initWSClients(ClusterConfig clusterConfig) {
159         for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
160             if (clusterConfig.isMe(nodeConfig)) {
161                 continue;
162             }
163             String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
164             try {
165                 LOG.debug("registering ws client for " + url);
166                 clusterNodeClients.add(new URI(url));
167             } catch (URISyntaxException e) {
168                 LOG.warn("problem instantiating wsclient for url: " + url);
169             }
170         }
171     }
172 }