Merge "SDN-R apigateway add junit test"
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
19
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientDummyImpl;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
51 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
52 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
53 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
55 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
56 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
71 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75 @SuppressWarnings("deprecation")
76 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
77
78     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
79     private static final String APPLICATION_NAME = "DeviceManager";
80     private static final String MYDBKEYNAMEBASE = "SDN-Controller";
81
82     // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
83     private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
84             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
85                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
86     @SuppressWarnings("unused")
87     private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
88     // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
89
90     private DataBroker dataBroker = null;
91     private MountPointService mountPointService = null;
92     private RpcProviderRegistry rpcProviderRegistry = null;
93     @SuppressWarnings("unused")
94     private NotificationPublishService notificationPublishService = null;
95
96     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
97             new ConcurrentHashMap<>();
98
99     private WebSocketServiceClient webSocketService;
100     private HtDatabaseEventsService databaseClientEvents;
101     private ODLEventListener odlEventListener;
102     private NetconfChangeListener netconfChangeListener;
103     private DeviceManagerApiServiceImpl rpcApiService;
104     private @Nullable PerformanceManagerImpl performanceManager = null;
105     private ProviderClient dcaeProviderClient;
106     private ProviderClient aotsMProvider;
107     private @Nullable AaiProviderClient aaiProviderClient;
108     private DeviceMonitorImpl deviceMonitor;
109     private IndexUpdateService updateService;
110     private IndexConfigService configService;
111     private IndexMwtnService mwtnService;
112     private HtDatabaseNode htDatabase;
113     private Boolean devicemanagerInitializationOk = false;
114     private MaintenanceServiceImpl maintenanceService;
115     private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
116     private Thread threadDoClearCurrentFaultByNodename = null;
117     private int refreshCounter = 0;
118     private AkkaConfig akkaConfig;
119
120     // Blueprint 1
121     public DeviceManagerImpl() {
122         LOG.info("Creating provider for {}", APPLICATION_NAME);
123     }
124
125     public void setDataBroker(DataBroker dataBroker) {
126         this.dataBroker = dataBroker;
127     }
128
129     public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
130         this.rpcProviderRegistry = rpcProviderRegistry;
131     }
132
133     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
134         this.notificationPublishService = notificationPublishService;
135     }
136
137     public void setMountPointService(MountPointService mountPointService) {
138         this.mountPointService = mountPointService;
139     }
140
141     public void init() {
142
143         LOG.info("Session Initiated start {}", APPLICATION_NAME);
144
145         // Start RPC Service
146         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
147         // Get configuration
148         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
149         this.akkaConfig = null;
150         try {
151             this.akkaConfig = AkkaConfig.load();
152             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
153         } catch (Exception e1) {
154             LOG.warn("problem loading akka.conf: " + e1.getMessage());
155         }
156         GeoConfig geoConfig = null;
157         if (akkaConfig != null && akkaConfig.isCluster()) {
158             LOG.info("cluster mode detected");
159             if (GeoConfig.fileExists()) {
160                 try {
161                     LOG.debug("try to load geoconfig");
162                     geoConfig = GeoConfig.load();
163                 } catch (Exception err) {
164                     LOG.warn("problem loading geoconfig: " + err.getMessage());
165                 }
166             } else {
167                 LOG.debug("no geoconfig file found");
168             }
169         } else {
170             LOG.info("single node mode detected");
171         }
172
173         this.notificationDelayService = new NotificationDelayService<>(config);
174
175         EsConfig dbConfig = config.getEs();
176         LOG.debug("esConfig=" + dbConfig.toString());
177         // Start database
178         htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
179         if (htDatabase == null) {
180             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
181         } else {
182             // init Database Values only if singleNode or clusterMember=1
183             if (akkaConfig == null || akkaConfig.isSingleNode() || akkaConfig != null && akkaConfig.isCluster()
184                     && akkaConfig.getClusterConfig().getRoleMemberIndex() == 1) {
185                 // Create DB index if not existing and if database is running
186                 try {
187                     this.configService = new IndexConfigService(htDatabase);
188                     this.mwtnService = new IndexMwtnService(htDatabase);
189                 } catch (Exception e) {
190                     LOG.warn("Can not start ES access clients to provide database index config, mwtn. ", e);
191                 }
192             }
193             // start service for device maintenance service
194             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
195             // Websockets
196             try {
197                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
198             } catch (Exception e) {
199                 LOG.error("Can not start websocket service. Loading mock class.", e);
200                 this.webSocketService = new WebSocketServiceClientDummyImpl();
201             }
202             // DCAE
203             this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
204
205             this.aaiProviderClient = new AaiProviderClient(config, this);
206             // EM
207             EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
208
209             if (emConfig == null) {
210                 LOG.warn("No configuration available. Don't start event manager");
211             } else {
212                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
213
214                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
215
216
217                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
218                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
219             }
220
221             // PM
222             PmConfig configurationPM = config.getPm();
223             LOG.info("Performance manager configuration: {}", configurationPM);
224             if (!configurationPM.isPerformanceManagerEnabled()) {
225
226                 LOG.info("No configuration available. Don't start performance manager");
227             } else {
228                 @Nullable
229                 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
230                 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
231                 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
232             }
233
234             // DUS (Database update service)
235             LOG.debug("start db update service");
236             this.updateService =
237                     new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
238             this.updateService.start();
239
240             // RPC Service for specific services
241             this.rpcApiService.setMaintenanceService(this.maintenanceService);
242             this.rpcApiService.setResyncListener(this);
243             // DM
244             // DeviceMonitor has to be available before netconfSubscriptionManager is
245             // configured
246             LOG.debug("start DeviceMonitor Service");
247             this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener);
248
249             // netconfSubscriptionManager should be the last one because this is a callback
250             // service
251             LOG.debug("start NetconfSubscriptionManager Service");
252             // this.netconfSubscriptionManager = new
253             // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
254             // this.netconfSubscriptionManager.register();
255             this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
256             this.netconfChangeListener.register();
257
258             this.devicemanagerInitializationOk = true;
259         }
260         LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
261     }
262
263     @Override
264     public void close() throws Exception {
265         LOG.info("DeviceManagerImpl closing ...");
266
267         close(performanceManager);
268         close(dcaeProviderClient);
269         close(aaiProviderClient);
270         close(aotsMProvider);
271         close(deviceMonitor);
272         close(updateService, configService, mwtnService);
273         close(htDatabase);
274         close(netconfChangeListener);
275         close(maintenanceService);
276         close(rpcApiService);
277         close(notificationDelayService);
278
279         LOG.info("DeviceManagerImpl closing done");
280     }
281
282
283     /**
284      * Used to close all Services, that should support AutoCloseable Pattern
285      *
286      * @param toClose
287      * @throws Exception
288      */
289     private void close(AutoCloseable... toCloseList) throws Exception {
290         for (AutoCloseable element : toCloseList) {
291             if (element != null) {
292                 element.close();
293             }
294         }
295     }
296
297     /*-------------------------------------------------------------------------------------------
298      * Functions for interface DeviceManagerService
299      */
300
301     /**
302      * For each mounted device a mountpoint is created and this listener is called.
303      */
304     @Override
305     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
306
307         String mountPointNodeName = nNodeId.getValue();
308         LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
309
310         boolean preConditionMissing = false;
311         if (mountPointService == null) {
312             preConditionMissing = true;
313             LOG.warn("No mountservice available.");
314         }
315         if (!devicemanagerInitializationOk) {
316             preConditionMissing = true;
317             LOG.warn("Devicemanager initialization still pending.");
318         }
319         if (preConditionMissing) {
320             return;
321         }
322
323         if (networkElementRepresentations.containsKey(mountPointNodeName)) {
324             LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
325             return;
326         }
327
328         if (!isMaster(nNode)) {
329             // Change Devicemonitor-status to connected ... for non master mountpoints.
330             deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
331             return;
332         }
333
334         InstanceIdentifier<Node> instanceIdentifier =
335                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
336
337         Optional<MountPoint> optionalMountPoint = null;
338         int timeout = 10000;
339         while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent() && timeout > 0) {
340             LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
341             sleepMs(1000);
342             timeout -= 1000;
343         }
344
345         if (!optionalMountPoint.isPresent()) {
346             LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
347                     mountPointNodeName);
348             return;
349         }
350         // Mountpoint is present for sure
351         MountPoint mountPoint = optionalMountPoint.get();
352         //BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
353         LOG.info("Mountpoint with id: {} class {} toString {}", mountPoint.getIdentifier(), mountPoint.getClass().getName(), mountPoint);
354         Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
355
356         if (! optionalNetconfNodeDatabroker.isPresent()) {
357             LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
358             return;
359         }
360
361         DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
362         LOG.info("Master mountpoint {}", mountPointNodeName);
363         // getNodeInfoTest(dataBroker);
364
365         // create automatic empty maintenance entry into db before reading and listening for problems
366         this.maintenanceService.createIfNotExists(mountPointNodeName);
367
368         // Setup microwaveEventListener for Notificationservice
369
370         // MicrowaveEventListener microwaveEventListener = new
371         // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
372         // xmlMapper, databaseClientEvents);
373         ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName, dataBroker,
374                 webSocketService, databaseClientEvents, instanceIdentifier, netconfNodeDataBroker, dcaeProviderClient,
375                 aotsMProvider, maintenanceService, notificationDelayService);
376         networkElementRepresentations.put(mountPointNodeName, ne);
377         ne.doRegisterMicrowaveEventListener(mountPoint);
378
379         // Register netconf stream
380         registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
381
382         // -- Read data from NE
383         ne.initialReadFromNetworkElement();
384         ne.initSynchronizationExtension();
385
386         // Setup Service that monitors registration/ deregistration of session
387         odlEventListener.registration(mountPointNodeName);
388
389         if (aaiProviderClient != null) {
390             aaiProviderClient.onDeviceRegistered(mountPointNodeName);
391         }
392         // -- Register NE to performance manager
393         if (performanceManager != null) {
394             performanceManager.registration(mountPointNodeName, ne);
395         }
396
397         deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
398
399         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
400     }
401
402     @Override
403     public void enterNonConnectedState(NodeId nNodeId, NetconfNode nNode) {
404         String mountPointNodeName = nNodeId.getValue();
405         LOG.info("enter Non ConnectedState for device :: Name : {}", mountPointNodeName);
406
407         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
408         if (ne != null) {
409             // Handling transition mountpoint connected -> connecting
410             this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
411             int problems = ne.removeAllCurrentProblemsOfNode();
412             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
413             if (odlEventListener != null) {
414                 odlEventListener.deRegistration(mountPointNodeName);
415             }
416             if (performanceManager != null) {
417                 performanceManager.deRegistration(mountPointNodeName);
418             }
419             if (aaiProviderClient != null) {
420                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
421             }
422         } else {
423             // Handling -> create not connected mountpoint, or change other beside connected.
424             ConnectionStatus csts = nNode.getConnectionStatus();
425             if (csts != null) {
426                 odlEventListener.updateRegistration(mountPointNodeName, csts.getClass().getSimpleName(), csts.getName());
427             } else {
428                 LOG.info("Unknown connection status");
429             }
430         }
431         if (deviceMonitor != null) {
432             deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
433         }
434
435     }
436
437     @Override
438     public void removeMountpointState(NodeId nNodeId) {
439         String mountPointNodeName = nNodeId.getValue();
440         LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
441         deviceMonitor.removeMountpointIndication(mountPointNodeName);
442     }
443
444     /*-------------------------------------------------------------------------------------------
445      * Functions
446      */
447
448     /**
449      * Async RPC Interface implementation
450      */
451     @Override
452     public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
453             throws IllegalStateException {
454
455         if (this.databaseClientEvents == null) {
456             throw new IllegalStateException("dbEvents service not instantiated");
457         }
458
459         if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
460             throw new IllegalStateException("A clear task is already active");
461         } else {
462
463             // Create list of mountpoints if input is empty, using the content in ES
464             if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
465                 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
466             }
467
468             // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
469             // DeviceManager
470             final List<String> nodeNamesHandled = new ArrayList<>();
471             for (String mountpointName : nodeNamesInput) {
472                 LOG.info("Work with mountpoint {}", mountpointName);
473
474                 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
475
476                     // SDN Controller related alarms
477                     // -- can not be recreated on all nodes in connected state
478                     // -- would result in a DCAE/AAI Notification
479                     // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
480                     LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
481                     // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
482                     // nodeNamesHandled.add(mountpointName);
483
484                 } else {
485
486                     if (mountPointService != null) {
487                         InstanceIdentifier<Node> instanceIdentifier =
488                                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
489                         Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
490
491                         if (!optionalMountPoint.isPresent()) {
492                             LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
493                             this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
494                             nodeNamesHandled.add(mountpointName);
495                         } else {
496                             if (networkElementRepresentations.containsKey(mountpointName)) {
497                                 LOG.info("At node known mountpoint {}", mountpointName);
498                                 nodeNamesHandled.add(mountpointName);
499                             } else {
500                                 LOG.info("At node unknown mountpoint {}", mountpointName);
501                             }
502                         }
503                     }
504                 }
505             }
506
507             // Force a sync
508             if (this.deviceMonitor != null) {
509                 this.deviceMonitor.refreshAlarmsInDb();
510             }
511
512             threadDoClearCurrentFaultByNodename = new Thread(() -> {
513                 refreshCounter++;
514                 LOG.info("Start refresh mountpoint task {}", refreshCounter);
515                 // for(String nodeName:nodeNamesOutput) {
516                 for (String nodeName : nodeNamesHandled) {
517                     ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
518                     if (ne != null) {
519                         LOG.info("Refresh mountpoint {}", nodeName);
520                         ne.initialReadFromNetworkElement();
521                     } else {
522                         LOG.info("Unhandled mountpoint {}", nodeName);
523                     }
524                 }
525                 LOG.info("End refresh mountpoint task {}", refreshCounter);
526             });
527             threadDoClearCurrentFaultByNodename.start();
528             return nodeNamesHandled;
529         }
530     };
531
532     /**
533      * Indication if init() of devicemanager successfully done.
534      *
535      * @return true if init() was sucessfull. False if not done or not successfull.
536      */
537     public boolean isDevicemanagerInitializationOk() {
538         return this.devicemanagerInitializationOk;
539     }
540
541     /**
542      * Get initialization status of database.
543      *
544      * @return true if fully initialized false if not
545      */
546     public boolean isDatabaseInitializationFinished() {
547         return htDatabase == null ? false : htDatabase.getInitialized();
548     }
549
550     /*---------------------------------------------------------------------
551      * Private funtions
552      */
553
554     /**
555      * Do the stream creation for the device.
556      *
557      * @param mountPointNodeName
558      * @param mountPoint
559      */
560     private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
561
562         final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
563                 mountPoint.getService(RpcConsumerRegistry.class);
564         if (optionalRpcConsumerService.isPresent()) {
565             final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
566             final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
567             if (rpcService == null) {
568                 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
569             } else {
570                 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
571                         new CreateSubscriptionInputBuilder();
572                 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
573                 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
574                         mountPointNodeName);
575                 try {
576                     CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
577                     if (createSubscriptionInput == null) {
578                         LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
579                     } else {
580                         rpcService.createSubscription(createSubscriptionInput);
581                     }
582                 } catch (NullPointerException e) {
583                     LOG.warn("createSubscription failed");
584                 }
585             }
586         } else {
587             LOG.warn("No RpcConsumerRegistry avaialble.");
588         }
589
590     }
591
592     /**
593      * Get NE object
594      *
595      * @param mountpoint mount point name
596      * @return null or NE specific data
597      */
598     public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
599
600         return networkElementRepresentations.get(mountpoint);
601
602     }
603
604     /* -- LOG related functions -- */
605
606
607     private boolean isInClusterMode() {
608         return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
609     }
610
611     private String getClusterNetconfNodeName() {
612         return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
613     }
614
615     private boolean isMaster(NetconfNode nnode) {
616         if (isInClusterMode()) {
617             LOG.debug("check if me is responsible for node");
618             String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
619                     : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
620             /*
621              * List<NodeStatus> clusterNodeStatusList=nnode.getClusteredConnectionStatus()==null?null:nnode.
622              * getClusteredConnectionStatus().getNodeStatus(); if(clusterNodeStatusList!=null) { for(NodeStatus
623              * s: clusterNodeStatusList) LOG.debug("node "+s.getNode()+
624              * " with status "+(s.getStatus()==null?"null":s.getStatus().getName())); }
625              */
626             String myNodeName = getClusterNetconfNodeName();
627             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
628             if (!masterNodeName.equals(myNodeName)) {
629                 LOG.debug("netconf change but me is not master for this node");
630                 return false;
631             }
632         }
633         return true;
634     }
635
636
637     private void sleepMs(int milliseconds) {
638         try {
639             Thread.sleep(milliseconds);
640         } catch (InterruptedException e) {
641             LOG.debug("Interrupted sleep");
642             // Restore interrupted state...
643             Thread.currentThread().interrupt();
644         }
645     }
646
647 }