2 * Copyright (c) 2017 highstreet technologies GmbH
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mwtn.devicemanager.impl;
11 import java.util.concurrent.ConcurrentHashMap;
13 import javax.annotation.Nullable;
15 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
16 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
17 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
19 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
20 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
21 import org.opendaylight.mwtn.aaiConnector.impl.AaiProviderClient;
22 import org.opendaylight.mwtn.base.database.HtDatabaseNode;
23 import org.opendaylight.mwtn.base.netconf.ONFCoreNetworkElementFactory;
24 import org.opendaylight.mwtn.base.netconf.ONFCoreNetworkElementRepresentation;
25 import org.opendaylight.mwtn.config.impl.AkkaConfig;
26 import org.opendaylight.mwtn.config.impl.EsConfig;
27 import org.opendaylight.mwtn.config.impl.GeoConfig;
28 import org.opendaylight.mwtn.config.impl.HtDevicemanagerConfiguration;
29 import org.opendaylight.mwtn.config.impl.PmConfig;
30 import org.opendaylight.mwtn.dcaeConnector.impl.DcaeProviderClient;
31 import org.opendaylight.mwtn.deviceMonitor.impl.DeviceMonitorImpl;
32 import org.opendaylight.mwtn.devicemanager.api.DeviceManagerService;
33 import org.opendaylight.mwtn.devicemanager.impl.database.service.HtDatabaseEventsService;
34 import org.opendaylight.mwtn.devicemanager.impl.listener.NetconfChangeListener;
35 import org.opendaylight.mwtn.devicemanager.impl.listener.ODLEventListener;
36 import org.opendaylight.mwtn.devicemanager.impl.xml.WebSocketServiceClient;
37 import org.opendaylight.mwtn.devicemanager.impl.xml.XmlMapper;
38 import org.opendaylight.mwtn.index.impl.IndexConfigService;
39 import org.opendaylight.mwtn.index.impl.IndexMwtnService;
40 import org.opendaylight.mwtn.index.impl.IndexUpdateService;
41 import org.opendaylight.mwtn.performancemanager.impl.PerformanceManagerImpl;
42 import org.opendaylight.mwtn.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
44 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
46 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.websocketmanager.rev150105.WebsocketmanagerService;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import com.google.common.base.Optional;
63 public class DeviceManagerImpl implements DeviceManagerService, BindingAwareProvider, AutoCloseable {
65 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
66 private static final String MYDBKEYNAME = "SDN-Controller";
68 // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
69 private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID = InstanceIdentifier
70 .create(NetworkTopology.class)
71 .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
73 private ProviderContext session;
74 private DataBroker dataBroker;
76 private WebSocketServiceClient webSocketService;
77 private HtDatabaseEventsService databaseClientEvents;
78 private ODLEventListener odlEventListener;
79 private NetconfChangeListener netconfChangeListener;
81 private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations = new ConcurrentHashMap<>();
82 private @Nullable PerformanceManagerImpl performanceManager = null;
83 private ProviderClient dcaeProviderClient;
84 private @Nullable AaiProviderClient aaiProviderClient;
85 private DeviceMonitorImpl deviceMonitor;
86 private IndexUpdateService updateService;
87 private IndexConfigService configService;
88 private IndexMwtnService mwtnService;
89 private HtDatabaseNode htDatabase;
90 private Boolean initialized = false;
93 public void onSessionInitiated(ProviderContext pSession) {
94 LOG.info("Session Initiated start");
96 this.session = pSession;
97 this.dataBroker = pSession.getSALService(DataBroker.class);
100 HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
101 AkkaConfig akkaConfig = null;
103 akkaConfig = AkkaConfig.load();
104 } catch (Exception e1) {
105 LOG.warn("error loading akka.conf: " + e1.getMessage());
107 GeoConfig geoConfig = null;
108 if (akkaConfig != null && akkaConfig.isCluster()) {
109 if (GeoConfig.fileExists()) {
111 LOG.debug("try to load geoconfig");
112 geoConfig = GeoConfig.load();
113 } catch (Exception err) {
114 LOG.warn("error loading geoconfig: " + err.getMessage());
117 LOG.debug("no geoconfig file found");
119 EsConfig dbConfig = config.getEs();
120 LOG.debug("esConfig=" + dbConfig.toString());
122 htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig,geoConfig);
124 // init Database Values only if singleNode or clusterMember=1
125 if (akkaConfig == null || akkaConfig.isSingleNode() || (akkaConfig != null && akkaConfig.isCluster()
126 && akkaConfig.getClusterConfig().getRoleMemberIndex() == 1)) {
127 // Create DB index if not existing and if database is running
128 this.configService = new IndexConfigService(htDatabase);
129 this.mwtnService = new IndexMwtnService(htDatabase);
132 this.webSocketService = new WebSocketServiceClient(pSession.getRpcService(WebsocketmanagerService.class),
136 this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
138 this.aaiProviderClient = new AaiProviderClient(config,this);
140 EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
142 if (emConfig == null) {
143 LOG.warn("No {} configuration available. Don't start event manager");
145 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
147 String myDbKeyNameExtended=MYDBKEYNAME+"-"+dbConfig.getCluster();
150 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService, databaseClientEvents,
151 dcaeProviderClient, null);
155 PmConfig configurationPM = config.getPm();
156 LOG.info("Performance manager configuration: {}", configurationPM);
157 if (!configurationPM.isPerformanceManagerEnabled()) {
159 LOG.info("No configuration available. Don't start performance manager");
161 @Nullable MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
162 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
163 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
166 // DUS (Database update service)
167 LOG.debug("start db update service");
168 this.updateService = new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(),
170 this.updateService.start();
173 // DeviceMonitor has to be available before netconfSubscriptionManager is
175 LOG.debug("start DeviceMonitor Service");
176 this.deviceMonitor = new DeviceMonitorImpl(odlEventListener);
178 // netconfSubscriptionManager should be the last one because this is a callback
180 LOG.debug("start NetconfSubscriptionManager Service");
181 // this.netconfSubscriptionManager = new
182 // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
183 // this.netconfSubscriptionManager.register();
184 this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
185 this.netconfChangeListener.register();
187 synchronized (initialized) {
191 LOG.info("Session Initiated end");
195 public void close() throws Exception {
196 LOG.info("DeviceManagerImpl closing ...");
198 close(performanceManager);
199 close(dcaeProviderClient);
200 close(aaiProviderClient);
201 close(deviceMonitor);
202 close(updateService, configService, mwtnService);
204 close(netconfChangeListener);
206 LOG.info("DeviceManagerImpl closing done");
210 * Used to close all Services, that should support AutoCloseable Pattern
215 private void close(AutoCloseable... toCloseList) throws Exception {
216 for (int t = 0; t < toCloseList.length; t++) {
217 if (toCloseList[t] != null)
218 toCloseList[t].close();
223 * For each mounted device a mountpoint is created and this listener is called.
226 public void startListenerOnNode(NodeId nNodeId, NetconfNode nNode) {
227 synchronized (networkElementRepresentations) {
229 String mountPointNodeName = nNodeId.getValue();
230 LOG.info("Starting Event listener on Netconf device :: Name : {}", mountPointNodeName);
232 if (networkElementRepresentations.containsKey(mountPointNodeName)) {
233 LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
237 LOG.warn("Devicemanager initialization still pending. Leave startup procedure. Mountpoint {}", mountPointNodeName);
241 MountPointService mountService = session.getSALService(MountPointService.class);
243 InstanceIdentifier<Node> instanceIdentifier = NETCONF_TOPO_IID.child(Node.class,
244 new NodeKey(new NodeId(mountPointNodeName)));
246 Optional<MountPoint> optionalMountPoint = null;
248 while (!(optionalMountPoint = mountService.getMountPoint(instanceIdentifier)).isPresent() && timeout > 0) {
250 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
254 } catch (InterruptedException e) {
255 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {} Time: {}",
256 mountPointNodeName, timeout);
260 if (!optionalMountPoint.isPresent()) {
261 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
266 // Mountpoint is present for sure
267 MountPoint mountPoint = optionalMountPoint.get();
269 DataBroker netconfNodeDataBroker = mountPoint.getService(DataBroker.class).orNull();
270 if (netconfNodeDataBroker == null) {
271 LOG.info("Mountpoint is slave mountpoint {}", mountPointNodeName);
275 LOG.info("Databroker service 1:{} 2:{}", dataBroker.hashCode(), netconfNodeDataBroker.hashCode());
276 // getNodeInfoTest(dataBroker);
279 // Setup microwaveEventListener for Notificationservice
281 // MicrowaveEventListener microwaveEventListener = new
282 // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
283 // xmlMapper, databaseClientEvents);
284 ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName, dataBroker,
285 webSocketService, databaseClientEvents, instanceIdentifier, netconfNodeDataBroker, dcaeProviderClient,
287 networkElementRepresentations.put(mountPointNodeName, ne);
288 ne.doRegisterMicrowaveEventListener(mountPoint);
290 // Register netconf stream
291 registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
293 // -- Read data from NE
294 ne.initialReadFromNetworkElement();
295 ne.initSynchronizationExtension();
297 // Setup Service that monitors registration/ deregistration of session
298 odlEventListener.registration(mountPointNodeName);
300 if (aaiProviderClient != null) {
301 aaiProviderClient.onDeviceRegistered(mountPointNodeName);
303 // -- Register NE to performance manager
304 if (performanceManager != null) {
305 performanceManager.registration(mountPointNodeName, ne);
308 deviceMonitor.deviceConnectIndication(mountPointNodeName, ne);
310 LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
317 public void removeListenerOnNode(NodeId nNodeId, NetconfNode nNode) {
318 String mountPointNodeName = nNodeId.getValue();
319 LOG.info("Removing NetworkElementRepresetations for device :: Name : {}", mountPointNodeName);
321 ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
323 int problems = ne.removeAllCurrentProblemsOfNode();
324 LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
325 if (odlEventListener != null)
326 odlEventListener.deRegistration(mountPointNodeName);
327 if (performanceManager != null) {
328 performanceManager.deRegistration(mountPointNodeName);
330 if (aaiProviderClient != null) {
331 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
334 LOG.info("No related ne object for mountpoint {} to deregister .", mountPointNodeName);
336 if (deviceMonitor != null)
337 deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
342 public void mountpointNodeCreation(NodeId nNodeId, NetconfNode nNode) {
343 String mountPointNodeName = nNodeId.getValue();
344 LOG.info("mountpointNodeCreation {} {}", nNodeId.getValue(), nNode.getConnectionStatus());
345 deviceMonitor.createMountpointIndication(mountPointNodeName);
349 public void mountpointNodeRemoved(NodeId nNodeId) {
350 String mountPointNodeName = nNodeId.getValue();
351 LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
352 deviceMonitor.removeMountpointIndication(mountPointNodeName);
356 * Do the stream creation for the device.
358 * @param mountPointNodeName
361 private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
363 final Optional<RpcConsumerRegistry> optionalRpcConsumerService = mountPoint
364 .getService(RpcConsumerRegistry.class);
365 if (optionalRpcConsumerService.isPresent()) {
366 final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
367 final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
368 if (rpcService == null) {
369 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
371 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder = new CreateSubscriptionInputBuilder();
372 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
373 LOG.info("Event listener triggering notification stream {} for node {}", streamName, mountPointNodeName);
375 CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
376 if (createSubscriptionInput == null) {
377 LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
379 rpcService.createSubscription(createSubscriptionInput);
381 } catch (NullPointerException e) {
382 LOG.warn("createSubscription failed");
386 LOG.warn("No RpcConsumerRegistry avaialble.");
394 * @return null or ne specific data
396 public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint( String mountpoint ) {
398 return networkElementRepresentations.get(mountpoint);