18066128feb67e138cb942cbb153a3513af7f905
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.websocketmanager2;
19
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.net.URI;
22 import java.net.URISyntaxException;
23 import java.util.ArrayList;
24 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
25 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
26 import org.json.JSONObject;
27 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.WebSocketManagerSocket.EventInputCallback;
28 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig;
29 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterConfig;
30 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.utils.AkkaConfig.ClusterNodeInfo;
31 import org.onap.ccsdk.features.sdnr.wt.websocketmanager2.websocket.SyncWebSocketClient;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketEventOutputBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
36 import org.opendaylight.yangtools.yang.common.RpcResult;
37 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
38 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 public class WebSocketManager extends WebSocketServlet implements WebsocketmanagerService {
43
44     private static final long serialVersionUID = -681665669062744439L;
45
46     private static final Logger LOG = LoggerFactory.getLogger(WebSocketManager.class.getName());
47     private static final String APPLICATION_NAME = WebSocketManager.class.getName();
48     private static final int PORT = 8181;
49     private final EventInputCallback rpcEventInputCallback;
50
51     /**
52      * timeout for websocket with no messages in ms
53      */
54     private static final long IDLE_TIMEOUT = 5 * 60 * 1000L;
55
56     private final ArrayList<URI> clusterNodeClients = new ArrayList<>();
57
58     public WebSocketManager() {
59        super();
60        rpcEventInputCallback = message -> {
61         LOG.debug("onMessagePushed: " + message);
62         SyncWebSocketClient client;
63         for (URI clientURI : WebSocketManager.this.clusterNodeClients) {
64             client = new SyncWebSocketClient(clientURI);
65             LOG.debug("try to push message to " + client.getURI());
66             client.openAndSendAndCloseSync(message);
67         }
68     };
69         LOG.info("Create servlet for {}", APPLICATION_NAME);
70     }
71
72     @Override
73     public void configure(WebSocketServletFactory factory) {
74         LOG.info("Configure provider for {}", APPLICATION_NAME);
75         // set a second timeout
76         factory.getPolicy().setIdleTimeout(IDLE_TIMEOUT);
77         factory.getPolicy().setMaxBinaryMessageSize(1);
78         factory.getPolicy().setMaxTextMessageSize(64 * 1024);
79
80         // register Socket as the WebSocket to create on Upgrade
81         factory.register(WebSocketManagerSocket.class);
82
83         AkkaConfig cfg = null;
84         try {
85             cfg = AkkaConfig.load();
86         } catch (Exception e) {
87             LOG.warn("problem loading akka config: " + e.getMessage());
88         }
89         if (cfg != null && cfg.isCluster()) {
90             this.initWSClients(cfg.getClusterConfig());
91         }
92     }
93
94     // ODL in Dublin version generates ListenableFuture that is child of Future.
95     @Override
96     public ListenableFuture<RpcResult<WebsocketEventOutput>> websocketEvent(WebsocketEventInput input) {
97         LOG.debug("Send message '{}'", input);
98         RpcResultBuilder<WebsocketEventOutput> result;
99         try {
100             WebsocketEventOutputBuilder outputBuilder = new WebsocketEventOutputBuilder();
101             final String s = input.getXmlEvent();
102             WebSocketManagerSocket.broadCast(input.getNodeName(), input.getEventType(), s);
103             outputBuilder.setResponse("OK");
104             try {
105                 JSONObject o = new JSONObject();
106                 o.put(WebSocketManagerSocket.KEY_NODENAME, input.getNodeName());
107                 o.put(WebSocketManagerSocket.KEY_EVENTTYPE, input.getEventType());
108                 o.put(WebSocketManagerSocket.KEY_XMLEVENT, input.getXmlEvent());
109                 this.rpcEventInputCallback.onMessagePushed(o.toString());
110             } catch (Exception err) {
111                 LOG.warn("problem pushing messsage to other nodes: " + err.getMessage());
112             }
113             result = RpcResultBuilder.success(outputBuilder);
114         } catch (Exception e) {
115             LOG.warn("Socketproblem: {}", e);
116             result = RpcResultBuilder.failed();
117             result.withError(ErrorType.APPLICATION, "Exception", e);
118         }
119         return result.buildFuture();
120     }
121
122     /**********************************************************
123      * Private functions
124      */
125
126     private void initWSClients(ClusterConfig clusterConfig) {
127         for (ClusterNodeInfo nodeConfig : clusterConfig.getSeedNodes()) {
128             if (clusterConfig.isMe(nodeConfig)) {
129                 continue;
130             }
131             String url = String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(), PORT);
132             try {
133                 LOG.debug("registering ws client for " + url);
134                 clusterNodeClients.add(new URI(url));
135             } catch (URISyntaxException e) {
136                 LOG.warn("problem instantiating wsclient for url: " + url);
137             }
138         }
139     }
140 }