2 * Copyright (c) 2016 Wipro Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mwtn.impl;
12 import java.net.URISyntaxException;
13 import java.util.ArrayList;
14 import java.util.List;
16 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
17 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
18 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
19 import org.opendaylight.mwtn.impl.WebsocketImpl.EventInputCallback;
20 import org.opendaylight.mwtn.impl.utils.AkkaConfig;
21 import org.opendaylight.mwtn.impl.utils.AkkaConfig.ClusterConfig;
22 import org.opendaylight.mwtn.impl.utils.AkkaConfig.ClusterNodeInfo;
23 import org.opendaylight.mwtn.impl.websocket.SyncWebSocketClient;
24 import org.opendaylight.mwtn.impl.websocket.WebSocketServerInitializer;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import io.netty.bootstrap.ServerBootstrap;
30 import io.netty.channel.Channel;
31 import io.netty.channel.EventLoopGroup;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.socket.nio.NioServerSocketChannel;
35 public class WebsocketmanagerProvider implements BindingAwareProvider, AutoCloseable, Runnable {
37 private static final Logger LOG = LoggerFactory.getLogger(WebsocketmanagerProvider.class);
38 private static final int PORT = 8085;
40 private Thread webserverThread;
41 private final ServerBootstrap bootstrap = new ServerBootstrap();
42 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
43 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
44 private RpcRegistration<WebsocketmanagerService> websocketService;
46 private List<URI> clusterNodeClients=null;
47 private final EventInputCallback rpcEventInputCallback=new EventInputCallback() {
50 public void onMessagePushed(String message) throws Exception {
52 LOG.debug("onMessagePushed: "+message);
53 if(WebsocketmanagerProvider.this.clusterNodeClients!=null &&
54 WebsocketmanagerProvider.this.clusterNodeClients.size()>0)
56 SyncWebSocketClient client;
57 for(URI clientURI : WebsocketmanagerProvider.this.clusterNodeClients)
59 client=new SyncWebSocketClient(clientURI);
60 LOG.debug("try to push message to "+client.getURI());
61 client.openAndSendAndCloseSync(message);
70 public void onSessionInitiated(ProviderContext session) {
71 LOG.info("WebsocketmanagerProvider Session Initiated");
72 webserverThread = new Thread(this);
73 webserverThread.start();
74 websocketService = session.addRpcImplementation(WebsocketmanagerService.class, new WebsocketImpl(rpcEventInputCallback));
77 cfg=AkkaConfig.load();
78 } catch (Exception e) {
79 LOG.warn("problem loading akka config: "+e.getMessage());
81 if(cfg!=null && cfg.isCluster())
83 this.initWSClients(cfg.getClusterConfig());
87 private void initWSClients(ClusterConfig clusterConfig) {
88 clusterNodeClients=new ArrayList<URI>();
89 for(ClusterNodeInfo nodeConfig:clusterConfig.getSeedNodes())
91 if(clusterConfig.isMe(nodeConfig))
93 String url=String.format("ws://%s:%d/websocket", nodeConfig.getRemoteAddress(),PORT);
95 LOG.debug("registering ws client for "+url);
96 clusterNodeClients.add(new URI(url));
97 } catch (URISyntaxException e) {
98 LOG.warn("problem instantiating wsclient for url: "+url);
104 public void close() throws Exception {
105 LOG.info("WebsocketmanagerProvider Closed");
106 closeWebsocketServer();
107 if (websocketService != null) {
108 websocketService.close();
114 // TODO Auto-generated method stub
116 startWebSocketServer();
117 } catch (Exception e) {
118 LOG.warn("Exception occured while starting webSocket server {}",e);
122 private void closeWebsocketServer() {
123 if (bossGroup != null) {
125 bossGroup.shutdownGracefully();
126 } catch (Exception e) {
127 LOG.warn("Exception occured while starting webSocket server {}",e);
130 if (workerGroup != null) {
132 workerGroup.shutdownGracefully();
133 } catch (Exception e) {
134 LOG.warn("Exception occured while starting webSocket server {}",e);
139 public void startWebSocketServer() throws Exception {
141 bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
142 .childHandler(new WebSocketServerInitializer());
143 Channel channel = bootstrap.bind(PORT).sync().channel();
144 LOG.info("Web socket server started at port " + PORT + '.');
145 LOG.info("Open your browser and navigate to http://localhost:" + PORT + '/');
146 channel.closeFuture().sync();
147 } catch (Exception e) {
148 LOG.warn("Exception in start websocket server ======== " + e.toString());
150 bossGroup.shutdownGracefully();
151 workerGroup.shutdownGracefully();