31ac954415fd14110bcf8ee7858c03783a8af584
[ccsdk/apps.git] / sdnr / wireless-transport / code-Carbon-SR1 / apps / websocketmanager / impl / src / main / java / org / opendaylight / mwtn / impl / WebsocketmanagerProvider.java
1 /*
2 * Copyright (c) 2016 Wipro Ltd. and others. All rights reserved.
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
7 */
8
9 package org.opendaylight.mwtn.impl;
10
11 import java.net.URI;
12 import java.net.URISyntaxException;
13 import java.util.ArrayList;
14 import java.util.List;
15
16 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
17 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
18 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
19 import org.opendaylight.mwtn.impl.WebsocketImpl.EventInputCallback;
20 import org.opendaylight.mwtn.impl.utils.AkkaConfig;
21 import org.opendaylight.mwtn.impl.utils.AkkaConfig.ClusterConfig;
22 import org.opendaylight.mwtn.impl.utils.AkkaConfig.ClusterNodeInfo;
23 import org.opendaylight.mwtn.impl.websocket.SyncWebSocketClient;
24 import org.opendaylight.mwtn.impl.websocket.WebSocketServerInitializer;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import io.netty.bootstrap.ServerBootstrap;
30 import io.netty.channel.Channel;
31 import io.netty.channel.EventLoopGroup;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.nio.NioServerSocketChannel;
34
35 public class WebsocketmanagerProvider implements BindingAwareProvider, AutoCloseable, Runnable {
36
37     private static final Logger LOG = LoggerFactory.getLogger(WebsocketmanagerProvider.class);
38     private static final int PORT = 8085;
39
40     private Thread webserverThread;
41     private final ServerBootstrap bootstrap = new ServerBootstrap();
42     private final EventLoopGroup bossGroup = new NioEventLoopGroup();
43     private final EventLoopGroup workerGroup = new NioEventLoopGroup();
44     private RpcRegistration<WebsocketmanagerService> websocketService;
45
46     private List<URI> clusterNodeClients=null;
47         private final EventInputCallback rpcEventInputCallback=new EventInputCallback() {
48
49                 @Override
50                 public void onMessagePushed(String message) throws Exception {
51
52                         LOG.debug("onMessagePushed: "+message);
53                         if(WebsocketmanagerProvider.this.clusterNodeClients!=null &&
54                                         WebsocketmanagerProvider.this.clusterNodeClients.size()>0)
55                         {
56                                 SyncWebSocketClient client;
57                                 for(URI clientURI : WebsocketmanagerProvider.this.clusterNodeClients)
58                                 {
59                                         client=new SyncWebSocketClient(clientURI);
60                                         LOG.debug("try to push message to "+client.getURI());
61                                         client.openAndSendAndCloseSync(message);
62
63                                         //client.close();
64                                 }
65                         }
66                 }
67         };
68
69     @Override
70     public void onSessionInitiated(ProviderContext session) {
71         LOG.info("WebsocketmanagerProvider Session Initiated");
72         webserverThread = new Thread(this);
73         webserverThread.start();
74         websocketService = session.addRpcImplementation(WebsocketmanagerService.class, new WebsocketImpl(rpcEventInputCallback));
75         AkkaConfig cfg =null;
76         try {
77                          cfg=AkkaConfig.load();
78                         } catch (Exception e) {
79                         LOG.warn("problem loading akka config: "+e.getMessage());
80                 }
81         if(cfg!=null && cfg.isCluster())
82                 {
83                         this.initWSClients(cfg.getClusterConfig());
84                 }
85     }
86
87     private void initWSClients(ClusterConfig clusterConfig) {
88         clusterNodeClients=new ArrayList<URI>();
89         for(ClusterNodeInfo nodeConfig:clusterConfig.getSeedNodes())
90         {
91                 if(clusterConfig.isMe(nodeConfig))
92                         continue;
93                 String url=String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(),PORT);
94                 try {
95                         LOG.debug("registering ws client for "+url);
96                                 clusterNodeClients.add(new URI(url));
97                         } catch (URISyntaxException e) {
98                                 LOG.warn("problem instantiating wsclient for url: "+url);
99                         }
100         }
101         }
102
103         @Override
104     public void close() throws Exception {
105         LOG.info("WebsocketmanagerProvider Closed");
106         closeWebsocketServer();
107         if (websocketService != null) {
108             websocketService.close();
109         }
110     }
111
112     @Override
113     public void run() {
114         // TODO Auto-generated method stub
115         try {
116             startWebSocketServer();
117         } catch (Exception e) {
118             LOG.warn("Exception occured while starting webSocket server {}",e);
119         }
120     }
121
122     private void closeWebsocketServer() {
123         if (bossGroup != null) {
124             try {
125                 bossGroup.shutdownGracefully();
126             } catch (Exception e) {
127                 LOG.warn("Exception occured while starting webSocket server {}",e);
128             }
129         }
130         if (workerGroup != null) {
131             try {
132                 workerGroup.shutdownGracefully();
133             } catch (Exception e) {
134                 LOG.warn("Exception occured while starting webSocket server {}",e);
135             }
136         }
137     }
138
139     public void startWebSocketServer() throws Exception {
140         try {
141             bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
142                     .childHandler(new WebSocketServerInitializer());
143             Channel channel = bootstrap.bind(PORT).sync().channel();
144             LOG.info("Web socket server started at port " + PORT + '.');
145             LOG.info("Open your browser and navigate to http://localhost:" + PORT + '/');
146             channel.closeFuture().sync();
147         } catch (Exception e) {
148             LOG.warn("Exception in start websocket server ======== " + e.toString());
149         } finally {
150             bossGroup.shutdownGracefully();
151             workerGroup.shutdownGracefully();
152         }
153     }
154
155 }