Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
19
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
51 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
52 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
53 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
54 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
55 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
56 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
58 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
70 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
75
76     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
77     private static final String APPLICATION_NAME = "DeviceManager";
78     private static final String MYDBKEYNAMEBASE = "SDN-Controller";
79
80     // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
81     private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
82             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
83                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
84     @SuppressWarnings("unused")
85     private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
86     // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
87
88     private DataBroker dataBroker = null;
89     private MountPointService mountPointService = null;
90     private RpcProviderRegistry rpcProviderRegistry = null;
91     @SuppressWarnings("unused")
92     private NotificationPublishService notificationPublishService = null;
93
94     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
95             new ConcurrentHashMap<>();
96
97     private WebSocketServiceClient webSocketService;
98     private HtDatabaseEventsService databaseClientEvents;
99     private ODLEventListener odlEventListener;
100     private NetconfChangeListener netconfChangeListener;
101     private DeviceManagerApiServiceImpl rpcApiService;
102     private @Nullable PerformanceManagerImpl performanceManager = null;
103     private ProviderClient dcaeProviderClient;
104     private ProviderClient aotsMProvider;
105     private @Nullable AaiProviderClient aaiProviderClient;
106     private DeviceMonitorImpl deviceMonitor;
107     private IndexUpdateService updateService;
108     private IndexConfigService configService;
109     private IndexMwtnService mwtnService;
110     private HtDatabaseNode htDatabase;
111     private Boolean devicemanagerInitializationOk = false;
112     private MaintenanceServiceImpl maintenanceService;
113     private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
114     private Thread threadDoClearCurrentFaultByNodename = null;
115     private int refreshCounter = 0;
116     private AkkaConfig akkaConfig;
117
118     // Blueprint 1
119     public DeviceManagerImpl() {
120         LOG.info("Creating provider for {}", APPLICATION_NAME);
121     }
122
123     public void setDataBroker(DataBroker dataBroker) {
124         this.dataBroker = dataBroker;
125     }
126
127     public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
128         this.rpcProviderRegistry = rpcProviderRegistry;
129
130     }
131
132     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
133         this.notificationPublishService = notificationPublishService;
134     }
135
136     public void setMountPointService(MountPointService mountPointService) {
137         this.mountPointService = mountPointService;
138     }
139
140     public void init() {
141
142         LOG.info("Session Initiated start {}", APPLICATION_NAME);
143
144         // Start RPC Service
145         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
146         // Get configuration
147         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
148         this.akkaConfig = null;
149         try {
150             this.akkaConfig = AkkaConfig.load();
151             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
152         } catch (Exception e1) {
153             LOG.warn("problem loading akka.conf: " + e1.getMessage());
154         }
155         GeoConfig geoConfig = null;
156         if (akkaConfig != null && akkaConfig.isCluster()) {
157             LOG.info("cluster mode detected");
158             if (GeoConfig.fileExists()) {
159                 try {
160                     LOG.debug("try to load geoconfig");
161                     geoConfig = GeoConfig.load();
162                 } catch (Exception err) {
163                     LOG.warn("problem loading geoconfig: " + err.getMessage());
164                 }
165             } else {
166                 LOG.debug("no geoconfig file found");
167             }
168         } else {
169             LOG.info("single node mode detected");
170         }
171
172         this.notificationDelayService = new NotificationDelayService<>(config);
173
174         EsConfig dbConfig = config.getEs();
175         LOG.debug("esConfig=" + dbConfig.toString());
176         // Start database
177         htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
178         if (htDatabase == null) {
179             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
180         } else {
181             // init Database Values only if singleNode or clusterMember=1
182             if (akkaConfig == null || akkaConfig.isSingleNode() || akkaConfig != null && akkaConfig.isCluster()
183                     && akkaConfig.getClusterConfig().getRoleMemberIndex() == 1) {
184                 // Create DB index if not existing and if database is running
185                 try {
186                     this.configService = new IndexConfigService(htDatabase);
187                     this.mwtnService = new IndexMwtnService(htDatabase);
188                 } catch (Exception e) {
189                     LOG.warn("Can not start ES access clients to provide database index config, mwtn. ",e);
190                 }
191             }
192             // start service for device maintenance service
193             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
194             // Websockets
195             try {
196                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
197             } catch (Exception e) {
198                 LOG.error("Can not start websocket service. Loading mock class.", e);
199                 this.webSocketService = new WebSocketServiceClientImpl();
200             }
201             // DCAE
202             this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
203
204             this.aaiProviderClient = new AaiProviderClient(config, this);
205             // EM
206             EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
207
208             if (emConfig == null) {
209                 LOG.warn("No configuration available. Don't start event manager");
210             } else {
211                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
212
213                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
214
215
216                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
217                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
218             }
219
220             // PM
221             PmConfig configurationPM = config.getPm();
222             LOG.info("Performance manager configuration: {}", configurationPM);
223             if (!configurationPM.isPerformanceManagerEnabled()) {
224
225                 LOG.info("No configuration available. Don't start performance manager");
226             } else {
227                 @Nullable
228                 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
229                 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
230                 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
231             }
232
233             // DUS (Database update service)
234             LOG.debug("start db update service");
235             this.updateService =
236                     new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
237             this.updateService.start();
238
239             // RPC Service for specific services
240             this.rpcApiService.setMaintenanceService(this.maintenanceService);
241             this.rpcApiService.setResyncListener(this);
242             // DM
243             // DeviceMonitor has to be available before netconfSubscriptionManager is
244             // configured
245             LOG.debug("start DeviceMonitor Service");
246             this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener);
247
248             // netconfSubscriptionManager should be the last one because this is a callback
249             // service
250             LOG.debug("start NetconfSubscriptionManager Service");
251             // this.netconfSubscriptionManager = new
252             // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
253             // this.netconfSubscriptionManager.register();
254             this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
255             this.netconfChangeListener.register();
256
257             this.devicemanagerInitializationOk = true;
258         }
259         LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
260     }
261
262     @Override
263     public void close() throws Exception {
264         LOG.info("DeviceManagerImpl closing ...");
265
266         close(performanceManager);
267         close(dcaeProviderClient);
268         close(aaiProviderClient);
269         close(aotsMProvider);
270         close(deviceMonitor);
271         close(updateService, configService, mwtnService);
272         close(htDatabase);
273         close(netconfChangeListener);
274         close(maintenanceService);
275         close(rpcApiService);
276         close(notificationDelayService);
277
278         LOG.info("DeviceManagerImpl closing done");
279     }
280
281
282     /**
283      * Used to close all Services, that should support AutoCloseable Pattern
284      *
285      * @param toClose
286      * @throws Exception
287      */
288     private void close(AutoCloseable... toCloseList) throws Exception {
289         for (AutoCloseable element : toCloseList) {
290             if (element != null) {
291                 element.close();
292             }
293         }
294     }
295
296     /**
297      * For each mounted device a mountpoint is created and this listener is called.
298      *
299      */
300     @Override
301     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
302
303         String mountPointNodeName = nNodeId.getValue();
304         LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
305
306         boolean preConditionMissing = false;
307         if (mountPointService == null) {
308             preConditionMissing = true;
309             LOG.warn("No mountservice available.");
310         }
311         if (!devicemanagerInitializationOk) {
312             preConditionMissing = true;
313             LOG.warn("Devicemanager initialization still pending.");
314         }
315         if (preConditionMissing) {
316             return;
317         }
318
319         if (networkElementRepresentations.containsKey(mountPointNodeName)) {
320             LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
321             return;
322         }
323
324         if (!isMaster(nNode)) {
325             // Change Devicemonitor-status to connected ... for non master mountpoints.
326             deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
327             return;
328         }
329
330         InstanceIdentifier<Node> instanceIdentifier =
331                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
332
333         Optional<MountPoint> optionalMountPoint = null;
334         int timeout = 10000;
335         while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent() && timeout > 0) {
336
337             LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
338             try {
339                 Thread.sleep(1000);
340                 timeout -= 1000;
341             } catch (InterruptedException e) {
342                 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {} Time: {}",
343                         mountPointNodeName, timeout);
344                 // Restore interrupted state...
345                 Thread.currentThread().interrupt();
346             }
347         }
348
349         if (!optionalMountPoint.isPresent()) {
350             LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
351                     mountPointNodeName);
352             return;
353         }
354         // Mountpoint is present for sure
355         MountPoint mountPoint = optionalMountPoint.get();
356
357         DataBroker netconfNodeDataBroker = mountPoint.getService(DataBroker.class).orNull();
358         if (netconfNodeDataBroker == null) {
359             LOG.info("Mountpoint is slave mountpoint {}", mountPointNodeName);
360             return;
361         }
362
363         LOG.info("Databroker service 1:{} 2:{}", dataBroker.hashCode(), netconfNodeDataBroker.hashCode());
364         // getNodeInfoTest(dataBroker);
365
366         // create automatic empty maintenance entry into db before reading and listening for problems
367         this.maintenanceService.createIfNotExists(mountPointNodeName);
368
369         // Setup microwaveEventListener for Notificationservice
370
371         // MicrowaveEventListener microwaveEventListener = new
372         // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
373         // xmlMapper, databaseClientEvents);
374         ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName, dataBroker,
375                 webSocketService, databaseClientEvents, instanceIdentifier, netconfNodeDataBroker, dcaeProviderClient,
376                 aotsMProvider, maintenanceService, notificationDelayService);
377         networkElementRepresentations.put(mountPointNodeName, ne);
378         ne.doRegisterMicrowaveEventListener(mountPoint);
379
380         // Register netconf stream
381         registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
382
383         // -- Read data from NE
384         ne.initialReadFromNetworkElement();
385         ne.initSynchronizationExtension();
386
387         // Setup Service that monitors registration/ deregistration of session
388         odlEventListener.registration(mountPointNodeName);
389
390         if (aaiProviderClient != null) {
391             aaiProviderClient.onDeviceRegistered(mountPointNodeName);
392         }
393         // -- Register NE to performance manager
394         if (performanceManager != null) {
395             performanceManager.registration(mountPointNodeName, ne);
396         }
397
398         deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
399
400         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
401     }
402
403     // removeListenerOnNode
404     @Override
405     public void leaveConnectedState(NodeId nNodeId, NetconfNode nNode) {
406         String mountPointNodeName = nNodeId.getValue();
407         LOG.info("leaveConnectedState for device :: Name : {}", mountPointNodeName);
408
409         this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
410         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
411         if (ne != null) {
412             int problems = ne.removeAllCurrentProblemsOfNode();
413             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
414             if (odlEventListener != null) {
415                 odlEventListener.deRegistration(mountPointNodeName);
416             }
417             if (performanceManager != null) {
418                 performanceManager.deRegistration(mountPointNodeName);
419             }
420             if (aaiProviderClient != null) {
421                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
422             }
423         } else {
424             LOG.info("No related ne object for mountpoint {} to deregister .", mountPointNodeName);
425         }
426         if (deviceMonitor != null) {
427             deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
428         }
429
430     }
431
432     /*
433      * @Override public void mountpointNodeCreation(NodeId nNodeId, NetconfNode nNode) { String
434      * mountPointNodeName = nNodeId.getValue(); LOG.info("mountpointNodeCreation {} {}",
435      * nNodeId.getValue(), nNode.getConnectionStatus());
436      * deviceMonitor.createMountpointIndication(mountPointNodeName); }
437      */
438     @Override
439     public void mountpointNodeRemoved(NodeId nNodeId) {
440         String mountPointNodeName = nNodeId.getValue();
441         LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
442         deviceMonitor.removeMountpointIndication(mountPointNodeName);
443     }
444
445     /**
446      * Async RPC Interface implementation
447      */
448     @Override
449     public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
450             throws IllegalStateException {
451
452         if (this.databaseClientEvents == null) {
453             throw new IllegalStateException("dbEvents service not instantiated");
454         }
455
456         if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
457             throw new IllegalStateException("A clear task is already active");
458         } else {
459
460             // Create list of mountpoints if input is empty, using the content in ES
461             if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
462                 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
463             }
464
465             // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
466             // DeviceManager
467             final List<String> nodeNamesHandled = new ArrayList<>();
468             for (String mountpointName : nodeNamesInput) {
469                 LOG.info("Work with mountpoint {}", mountpointName);
470
471                 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
472
473                     // SDN Controller related alarms
474                     // -- can not be recreated on all nodes in connected state
475                     // -- would result in a DCAE/AAI Notification
476                     // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
477                     LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
478                     // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
479                     // nodeNamesHandled.add(mountpointName);
480
481                 } else {
482
483                     if (mountPointService != null) {
484                         InstanceIdentifier<Node> instanceIdentifier =
485                                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
486                         Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
487
488                         if (!optionalMountPoint.isPresent()) {
489                             LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
490                             this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
491                             nodeNamesHandled.add(mountpointName);
492                         } else {
493                             if (networkElementRepresentations.containsKey(mountpointName)) {
494                                 LOG.info("At node known mountpoint {}", mountpointName);
495                                 nodeNamesHandled.add(mountpointName);
496                             } else {
497                                 LOG.info("At node unknown mountpoint {}", mountpointName);
498                             }
499                         }
500                     }
501                 }
502             }
503
504             // Force a sync
505             if (this.deviceMonitor != null) {
506                 this.deviceMonitor.refreshAlarmsInDb();
507             }
508
509             threadDoClearCurrentFaultByNodename = new Thread(() -> {
510                 refreshCounter++;
511                 LOG.info("Start refresh mountpoint task {}", refreshCounter);
512                 // for(String nodeName:nodeNamesOutput) {
513                 for (String nodeName : nodeNamesHandled) {
514                     ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
515                     if (ne != null) {
516                         LOG.info("Refresh mountpoint {}", nodeName);
517                         ne.initialReadFromNetworkElement();
518                     } else {
519                         LOG.info("Unhandled mountpoint {}", nodeName);
520                     }
521                 }
522                 LOG.info("End refresh mountpoint task {}", refreshCounter);
523             });
524             threadDoClearCurrentFaultByNodename.start();
525             return nodeNamesHandled;
526         }
527     };
528
529     /**
530      * Indication if init() of devicemanager successfully done.
531      *
532      * @return true if init() was sucessfull. False if not done or not successfull.
533      */
534     public boolean isDevicemanagerInitializationOk() {
535         return this.devicemanagerInitializationOk;
536     }
537
538     /**
539      * Get initialization status of database.
540      *
541      * @return true if fully initialized false if not
542      */
543     public boolean isDatabaseInitializationFinished() {
544         return htDatabase == null ? false : htDatabase.getInitialized();
545     }
546
547     /*---------------------------------------------------------------------
548      * Private funtions
549      */
550
551     /**
552      * Do the stream creation for the device.
553      *
554      * @param mountPointNodeName
555      * @param mountPoint
556      */
557     private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
558
559         final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
560                 mountPoint.getService(RpcConsumerRegistry.class);
561         if (optionalRpcConsumerService.isPresent()) {
562             final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
563             final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
564             if (rpcService == null) {
565                 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
566             } else {
567                 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
568                         new CreateSubscriptionInputBuilder();
569                 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
570                 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
571                         mountPointNodeName);
572                 try {
573                     CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
574                     if (createSubscriptionInput == null) {
575                         LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
576                     } else {
577                         rpcService.createSubscription(createSubscriptionInput);
578                     }
579                 } catch (NullPointerException e) {
580                     LOG.warn("createSubscription failed");
581                 }
582             }
583         } else {
584             LOG.warn("No RpcConsumerRegistry avaialble.");
585         }
586
587     }
588
589     /**
590      * Get NE object
591      *
592      * @param mountpoint mount point name
593      * @return null or NE specific data
594      */
595     public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
596
597         return networkElementRepresentations.get(mountpoint);
598
599     }
600
601     /* -- LOG related functions -- */
602
603
604     private boolean isInClusterMode() {
605         return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
606     }
607
608     private String getClusterNetconfNodeName() {
609         return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
610     }
611
612     private boolean isMaster(NetconfNode nnode) {
613         if (isInClusterMode()) {
614             LOG.debug("check if me is responsible for node");
615             String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
616                     : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
617             /*
618              * List<NodeStatus> clusterNodeStatusList=nnode.getClusteredConnectionStatus()==null?null:nnode.
619              * getClusteredConnectionStatus().getNodeStatus(); if(clusterNodeStatusList!=null) { for(NodeStatus
620              * s: clusterNodeStatusList) LOG.debug("node "+s.getNode()+
621              * " with status "+(s.getStatus()==null?"null":s.getStatus().getName())); }
622              */
623             String myNodeName = getClusterNetconfNodeName();
624             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
625             if (!masterNodeName.equals(myNodeName)) {
626                 LOG.debug("netconf change but me is not master for this node");
627                 return false;
628             }
629         }
630         return true;
631     }
632
633 }