1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
51 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
52 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
53 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
55 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
56 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
74 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
76 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
77 private static final String APPLICATION_NAME = "DeviceManager";
78 private static final String MYDBKEYNAMEBASE = "SDN-Controller";
80 // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
81 private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
82 InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
83 new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
84 @SuppressWarnings("unused")
85 private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
86 // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
88 private DataBroker dataBroker = null;
89 private MountPointService mountPointService = null;
90 private RpcProviderRegistry rpcProviderRegistry = null;
91 @SuppressWarnings("unused")
92 private NotificationPublishService notificationPublishService = null;
94 private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
95 new ConcurrentHashMap<>();
97 private WebSocketServiceClient webSocketService;
98 private HtDatabaseEventsService databaseClientEvents;
99 private ODLEventListener odlEventListener;
100 private NetconfChangeListener netconfChangeListener;
101 private DeviceManagerApiServiceImpl rpcApiService;
102 private @Nullable PerformanceManagerImpl performanceManager = null;
103 private ProviderClient dcaeProviderClient;
104 private ProviderClient aotsMProvider;
105 private @Nullable AaiProviderClient aaiProviderClient;
106 private DeviceMonitorImpl deviceMonitor;
107 private IndexUpdateService updateService;
108 private IndexConfigService configService;
109 private IndexMwtnService mwtnService;
110 private HtDatabaseNode htDatabase;
111 private Boolean devicemanagerInitializationOk = false;
112 private MaintenanceServiceImpl maintenanceService;
113 private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
114 private Thread threadDoClearCurrentFaultByNodename = null;
115 private int refreshCounter = 0;
116 private AkkaConfig akkaConfig;
119 public DeviceManagerImpl() {
120 LOG.info("Creating provider for {}", APPLICATION_NAME);
123 public void setDataBroker(DataBroker dataBroker) {
124 this.dataBroker = dataBroker;
127 public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
128 this.rpcProviderRegistry = rpcProviderRegistry;
132 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
133 this.notificationPublishService = notificationPublishService;
136 public void setMountPointService(MountPointService mountPointService) {
137 this.mountPointService = mountPointService;
142 LOG.info("Session Initiated start {}", APPLICATION_NAME);
145 this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
147 HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
148 this.akkaConfig = null;
150 this.akkaConfig = AkkaConfig.load();
151 LOG.debug("akka.conf loaded: " + akkaConfig.toString());
152 } catch (Exception e1) {
153 LOG.warn("problem loading akka.conf: " + e1.getMessage());
155 GeoConfig geoConfig = null;
156 if (akkaConfig != null && akkaConfig.isCluster()) {
157 LOG.info("cluster mode detected");
158 if (GeoConfig.fileExists()) {
160 LOG.debug("try to load geoconfig");
161 geoConfig = GeoConfig.load();
162 } catch (Exception err) {
163 LOG.warn("problem loading geoconfig: " + err.getMessage());
166 LOG.debug("no geoconfig file found");
169 LOG.info("single node mode detected");
172 this.notificationDelayService = new NotificationDelayService<>(config);
174 EsConfig dbConfig = config.getEs();
175 LOG.debug("esConfig=" + dbConfig.toString());
177 htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
178 if (htDatabase == null) {
179 LOG.error("Can only run with local database. Stop initialization of devicemanager.");
181 // init Database Values only if singleNode or clusterMember=1
182 if (akkaConfig == null || akkaConfig.isSingleNode() || akkaConfig != null && akkaConfig.isCluster()
183 && akkaConfig.getClusterConfig().getRoleMemberIndex() == 1) {
184 // Create DB index if not existing and if database is running
186 this.configService = new IndexConfigService(htDatabase);
187 this.mwtnService = new IndexMwtnService(htDatabase);
188 } catch (Exception e) {
189 LOG.warn("Can not start ES access clients to provide database index config, mwtn. ",e);
192 // start service for device maintenance service
193 this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
196 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
197 } catch (Exception e) {
198 LOG.error("Can not start websocket service. Loading mock class.", e);
199 this.webSocketService = new WebSocketServiceClientImpl();
202 this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
204 this.aaiProviderClient = new AaiProviderClient(config, this);
206 EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
208 if (emConfig == null) {
209 LOG.warn("No configuration available. Don't start event manager");
211 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
213 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
216 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
217 databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
221 PmConfig configurationPM = config.getPm();
222 LOG.info("Performance manager configuration: {}", configurationPM);
223 if (!configurationPM.isPerformanceManagerEnabled()) {
225 LOG.info("No configuration available. Don't start performance manager");
228 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
229 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
230 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
233 // DUS (Database update service)
234 LOG.debug("start db update service");
236 new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
237 this.updateService.start();
239 // RPC Service for specific services
240 this.rpcApiService.setMaintenanceService(this.maintenanceService);
241 this.rpcApiService.setResyncListener(this);
243 // DeviceMonitor has to be available before netconfSubscriptionManager is
245 LOG.debug("start DeviceMonitor Service");
246 this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener);
248 // netconfSubscriptionManager should be the last one because this is a callback
250 LOG.debug("start NetconfSubscriptionManager Service");
251 // this.netconfSubscriptionManager = new
252 // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
253 // this.netconfSubscriptionManager.register();
254 this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
255 this.netconfChangeListener.register();
257 this.devicemanagerInitializationOk = true;
259 LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
263 public void close() throws Exception {
264 LOG.info("DeviceManagerImpl closing ...");
266 close(performanceManager);
267 close(dcaeProviderClient);
268 close(aaiProviderClient);
269 close(aotsMProvider);
270 close(deviceMonitor);
271 close(updateService, configService, mwtnService);
273 close(netconfChangeListener);
274 close(maintenanceService);
275 close(rpcApiService);
276 close(notificationDelayService);
278 LOG.info("DeviceManagerImpl closing done");
283 * Used to close all Services, that should support AutoCloseable Pattern
288 private void close(AutoCloseable... toCloseList) throws Exception {
289 for (AutoCloseable element : toCloseList) {
290 if (element != null) {
297 * For each mounted device a mountpoint is created and this listener is called.
301 public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
303 String mountPointNodeName = nNodeId.getValue();
304 LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
306 boolean preConditionMissing = false;
307 if (mountPointService == null) {
308 preConditionMissing = true;
309 LOG.warn("No mountservice available.");
311 if (!devicemanagerInitializationOk) {
312 preConditionMissing = true;
313 LOG.warn("Devicemanager initialization still pending.");
315 if (preConditionMissing) {
319 if (networkElementRepresentations.containsKey(mountPointNodeName)) {
320 LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
324 if (!isMaster(nNode)) {
325 // Change Devicemonitor-status to connected ... for non master mountpoints.
326 deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
330 InstanceIdentifier<Node> instanceIdentifier =
331 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
333 Optional<MountPoint> optionalMountPoint = null;
335 while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent() && timeout > 0) {
337 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
341 } catch (InterruptedException e) {
342 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {} Time: {}",
343 mountPointNodeName, timeout);
344 // Restore interrupted state...
345 Thread.currentThread().interrupt();
349 if (!optionalMountPoint.isPresent()) {
350 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
354 // Mountpoint is present for sure
355 MountPoint mountPoint = optionalMountPoint.get();
357 DataBroker netconfNodeDataBroker = mountPoint.getService(DataBroker.class).orNull();
358 if (netconfNodeDataBroker == null) {
359 LOG.info("Mountpoint is slave mountpoint {}", mountPointNodeName);
363 LOG.info("Databroker service 1:{} 2:{}", dataBroker.hashCode(), netconfNodeDataBroker.hashCode());
364 // getNodeInfoTest(dataBroker);
366 // create automatic empty maintenance entry into db before reading and listening for problems
367 this.maintenanceService.createIfNotExists(mountPointNodeName);
369 // Setup microwaveEventListener for Notificationservice
371 // MicrowaveEventListener microwaveEventListener = new
372 // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
373 // xmlMapper, databaseClientEvents);
374 ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName, dataBroker,
375 webSocketService, databaseClientEvents, instanceIdentifier, netconfNodeDataBroker, dcaeProviderClient,
376 aotsMProvider, maintenanceService, notificationDelayService);
377 networkElementRepresentations.put(mountPointNodeName, ne);
378 ne.doRegisterMicrowaveEventListener(mountPoint);
380 // Register netconf stream
381 registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
383 // -- Read data from NE
384 ne.initialReadFromNetworkElement();
385 ne.initSynchronizationExtension();
387 // Setup Service that monitors registration/ deregistration of session
388 odlEventListener.registration(mountPointNodeName);
390 if (aaiProviderClient != null) {
391 aaiProviderClient.onDeviceRegistered(mountPointNodeName);
393 // -- Register NE to performance manager
394 if (performanceManager != null) {
395 performanceManager.registration(mountPointNodeName, ne);
398 deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
400 LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
403 // removeListenerOnNode
405 public void leaveConnectedState(NodeId nNodeId, NetconfNode nNode) {
406 String mountPointNodeName = nNodeId.getValue();
407 LOG.info("leaveConnectedState for device :: Name : {}", mountPointNodeName);
409 this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
410 ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
412 int problems = ne.removeAllCurrentProblemsOfNode();
413 LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
414 if (odlEventListener != null) {
415 odlEventListener.deRegistration(mountPointNodeName);
417 if (performanceManager != null) {
418 performanceManager.deRegistration(mountPointNodeName);
420 if (aaiProviderClient != null) {
421 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
424 LOG.info("No related ne object for mountpoint {} to deregister .", mountPointNodeName);
426 if (deviceMonitor != null) {
427 deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
433 * @Override public void mountpointNodeCreation(NodeId nNodeId, NetconfNode nNode) { String
434 * mountPointNodeName = nNodeId.getValue(); LOG.info("mountpointNodeCreation {} {}",
435 * nNodeId.getValue(), nNode.getConnectionStatus());
436 * deviceMonitor.createMountpointIndication(mountPointNodeName); }
439 public void mountpointNodeRemoved(NodeId nNodeId) {
440 String mountPointNodeName = nNodeId.getValue();
441 LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
442 deviceMonitor.removeMountpointIndication(mountPointNodeName);
446 * Async RPC Interface implementation
449 public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
450 throws IllegalStateException {
452 if (this.databaseClientEvents == null) {
453 throw new IllegalStateException("dbEvents service not instantiated");
456 if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
457 throw new IllegalStateException("A clear task is already active");
460 // Create list of mountpoints if input is empty, using the content in ES
461 if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
462 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
465 // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
467 final List<String> nodeNamesHandled = new ArrayList<>();
468 for (String mountpointName : nodeNamesInput) {
469 LOG.info("Work with mountpoint {}", mountpointName);
471 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
473 // SDN Controller related alarms
474 // -- can not be recreated on all nodes in connected state
475 // -- would result in a DCAE/AAI Notification
476 // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
477 LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
478 // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
479 // nodeNamesHandled.add(mountpointName);
483 if (mountPointService != null) {
484 InstanceIdentifier<Node> instanceIdentifier =
485 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
486 Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
488 if (!optionalMountPoint.isPresent()) {
489 LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
490 this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
491 nodeNamesHandled.add(mountpointName);
493 if (networkElementRepresentations.containsKey(mountpointName)) {
494 LOG.info("At node known mountpoint {}", mountpointName);
495 nodeNamesHandled.add(mountpointName);
497 LOG.info("At node unknown mountpoint {}", mountpointName);
505 if (this.deviceMonitor != null) {
506 this.deviceMonitor.refreshAlarmsInDb();
509 threadDoClearCurrentFaultByNodename = new Thread(() -> {
511 LOG.info("Start refresh mountpoint task {}", refreshCounter);
512 // for(String nodeName:nodeNamesOutput) {
513 for (String nodeName : nodeNamesHandled) {
514 ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
516 LOG.info("Refresh mountpoint {}", nodeName);
517 ne.initialReadFromNetworkElement();
519 LOG.info("Unhandled mountpoint {}", nodeName);
522 LOG.info("End refresh mountpoint task {}", refreshCounter);
524 threadDoClearCurrentFaultByNodename.start();
525 return nodeNamesHandled;
530 * Indication if init() of devicemanager successfully done.
532 * @return true if init() was sucessfull. False if not done or not successfull.
534 public boolean isDevicemanagerInitializationOk() {
535 return this.devicemanagerInitializationOk;
539 * Get initialization status of database.
541 * @return true if fully initialized false if not
543 public boolean isDatabaseInitializationFinished() {
544 return htDatabase == null ? false : htDatabase.getInitialized();
547 /*---------------------------------------------------------------------
552 * Do the stream creation for the device.
554 * @param mountPointNodeName
557 private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
559 final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
560 mountPoint.getService(RpcConsumerRegistry.class);
561 if (optionalRpcConsumerService.isPresent()) {
562 final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
563 final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
564 if (rpcService == null) {
565 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
567 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
568 new CreateSubscriptionInputBuilder();
569 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
570 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
573 CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
574 if (createSubscriptionInput == null) {
575 LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
577 rpcService.createSubscription(createSubscriptionInput);
579 } catch (NullPointerException e) {
580 LOG.warn("createSubscription failed");
584 LOG.warn("No RpcConsumerRegistry avaialble.");
592 * @param mountpoint mount point name
593 * @return null or NE specific data
595 public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
597 return networkElementRepresentations.get(mountpoint);
601 /* -- LOG related functions -- */
604 private boolean isInClusterMode() {
605 return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
608 private String getClusterNetconfNodeName() {
609 return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
612 private boolean isMaster(NetconfNode nnode) {
613 if (isInClusterMode()) {
614 LOG.debug("check if me is responsible for node");
615 String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
616 : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
618 * List<NodeStatus> clusterNodeStatusList=nnode.getClusteredConnectionStatus()==null?null:nnode.
619 * getClusteredConnectionStatus().getNodeStatus(); if(clusterNodeStatusList!=null) { for(NodeStatus
620 * s: clusterNodeStatusList) LOG.debug("node "+s.getNode()+
621 * " with status "+(s.getStatus()==null?"null":s.getStatus().getName())); }
623 String myNodeName = getClusterNetconfNodeName();
624 LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
625 if (!masterNodeName.equals(myNodeName)) {
626 LOG.debug("netconf change but me is not master for this node");