SDNR WT configure alarm severity
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
19
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.archiveservice.ArchiveCleanService;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitor;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorEmptyImpl;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientDummyImpl;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
51 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
52 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
53 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
54 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
55 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
56 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
57 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
58 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
79 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
80
81 @SuppressWarnings("deprecation")
82 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
83
84     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
85     private static final String APPLICATION_NAME = "DeviceManager";
86     private static final String MYDBKEYNAMEBASE = "SDN-Controller";
87
88     // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
89     private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
90             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
91                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
92     @SuppressWarnings("unused")
93     private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
94     // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
95
96     private DataBroker dataBroker = null;
97     private MountPointService mountPointService = null;
98     private RpcProviderRegistry rpcProviderRegistry = null;
99     @SuppressWarnings("unused")
100     private NotificationPublishService notificationPublishService = null;
101     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
102
103     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
104             new ConcurrentHashMap<>();
105
106     private WebSocketServiceClient webSocketService;
107     private HtDatabaseEventsService databaseClientEvents;
108     private ODLEventListener odlEventListener;
109     private NetconfChangeListener netconfChangeListener;
110     private DeviceManagerApiServiceImpl rpcApiService;
111     private @Nullable PerformanceManagerImpl performanceManager = null;
112     private ProviderClient dcaeProviderClient;
113     private ProviderClient aotsMProvider;
114     private @Nullable AaiProviderClient aaiProviderClient = null;
115     private @Nullable DeviceMonitor deviceMonitor = new DeviceMonitorEmptyImpl();
116     private IndexUpdateService updateService;
117     private IndexConfigService configService;
118     private IndexMwtnService mwtnService;
119     private HtDatabaseNode htDatabase;
120     private Boolean devicemanagerInitializationOk = false;
121     private MaintenanceServiceImpl maintenanceService;
122     private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
123     private Thread threadDoClearCurrentFaultByNodename = null;
124     private int refreshCounter = 0;
125     private AkkaConfig akkaConfig;
126     private ArchiveCleanService archiveCleanService;
127     @SuppressWarnings("unused")
128         private ClusterSingletonServiceRegistration cssRegistration;
129
130     // Blueprint 1
131     public DeviceManagerImpl() {
132         LOG.info("Creating provider for {}", APPLICATION_NAME);
133     }
134
135     public void setDataBroker(DataBroker dataBroker) {
136         this.dataBroker = dataBroker;
137     }
138
139     public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
140         this.rpcProviderRegistry = rpcProviderRegistry;
141     }
142
143     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
144         this.notificationPublishService = notificationPublishService;
145     }
146
147     public void setMountPointService(MountPointService mountPointService) {
148         this.mountPointService = mountPointService;
149     }
150     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
151         this.clusterSingletonServiceProvider = clusterSingletonService;
152     }
153     public void init() {
154
155         LOG.info("Session Initiated start {}", APPLICATION_NAME);
156
157         // Start RPC Service
158         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
159         // Get configuration
160         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
161         try {
162             this.akkaConfig = AkkaConfig.load();
163             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
164         } catch (Exception e1) {
165             this.akkaConfig = null;
166             LOG.warn("problem loading akka.conf: " + e1.getMessage());
167         }
168         GeoConfig geoConfig = null;
169         if (akkaConfig != null && akkaConfig.isCluster()) {
170             LOG.info("cluster mode detected");
171             if (GeoConfig.fileExists()) {
172                 try {
173                     LOG.debug("try to load geoconfig");
174                     geoConfig = GeoConfig.load();
175                 } catch (Exception err) {
176                     LOG.warn("problem loading geoconfig: " + err.getMessage());
177                 }
178             } else {
179                 LOG.debug("no geoconfig file found");
180             }
181         } else {
182             LOG.info("single node mode detected");
183         }
184
185         this.notificationDelayService = new NotificationDelayService<>(config);
186
187         EsConfig dbConfig = config.getEs();
188         LOG.debug("esConfig=" + dbConfig.toString());
189         // Start database
190         htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
191         if (htDatabase == null) {
192             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
193         } else {
194             // init Database Values only if singleNode or clusterMember=1
195             if (akkaConfig == null || akkaConfig.isClusterAndFirstNode()) {
196                 // Create DB index if not existing and if database is running
197                 try {
198                     this.configService = new IndexConfigService(htDatabase);
199                     this.mwtnService = new IndexMwtnService(htDatabase);
200                 } catch (Exception e) {
201                     LOG.warn("Can not start ES access clients to provide database index config, mwtn. ", e);
202                 }
203             }
204             // start service for device maintenance service
205             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
206
207             // Websockets
208             try {
209                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
210             } catch (Exception e) {
211                 LOG.error("Can not start websocket service. Loading mock class.", e);
212                 this.webSocketService = new WebSocketServiceClientDummyImpl();
213             }
214             // DCAE
215             this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
216
217             this.aaiProviderClient = new AaiProviderClient(config, this);
218             // EM
219             EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
220
221             if (emConfig == null) {
222                 LOG.warn("No configuration available. Don't start event manager");
223             } else {
224                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
225
226                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
227
228                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
229                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
230             }
231             this.archiveCleanService = new ArchiveCleanService(config, databaseClientEvents, mwtnService);
232             this.cssRegistration = this.clusterSingletonServiceProvider.registerClusterSingletonService(this.archiveCleanService);
233             // PM
234             PmConfig configurationPM = config.getPm();
235             LOG.info("Performance manager configuration: {}", configurationPM);
236             if (!configurationPM.isPerformanceManagerEnabled()) {
237
238                 LOG.info("No configuration available. Don't start performance manager");
239             } else {
240                 @Nullable
241                 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
242                 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
243                 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
244             }
245
246             // DUS (Database update service)
247             LOG.debug("start db update service");
248             this.updateService =
249                     new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
250             this.updateService.start();
251
252             // RPC Service for specific services
253             this.rpcApiService.setMaintenanceService(this.maintenanceService);
254             this.rpcApiService.setResyncListener(this);
255             // DM
256             // DeviceMonitor has to be available before netconfSubscriptionManager is
257             // configured
258             LOG.debug("start DeviceMonitor Service");
259             this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener, config);
260
261             // netconfSubscriptionManager should be the last one because this is a callback
262             // service
263             LOG.debug("start NetconfSubscriptionManager Service");
264             // this.netconfSubscriptionManager = new
265             // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
266             // this.netconfSubscriptionManager.register();
267             this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
268             this.netconfChangeListener.register();
269
270             this.devicemanagerInitializationOk = true;
271         }
272         LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
273     }
274
275     @Override
276     public void close() throws Exception {
277         LOG.info("DeviceManagerImpl closing ...");
278
279         close(performanceManager);
280         close(dcaeProviderClient);
281         close(aaiProviderClient);
282         close(aotsMProvider);
283         close(deviceMonitor);
284         close(updateService, configService, mwtnService);
285         close(htDatabase);
286         close(netconfChangeListener);
287         close(maintenanceService);
288         close(rpcApiService);
289         close(notificationDelayService);
290         close(archiveCleanService);
291         LOG.info("DeviceManagerImpl closing done");
292     }
293
294
295     /**
296      * Used to close all Services, that should support AutoCloseable Pattern
297      *
298      * @param toClose
299      * @throws Exception
300      */
301     private void close(AutoCloseable... toCloseList) throws Exception {
302         for (AutoCloseable element : toCloseList) {
303             if (element != null) {
304                 element.close();
305             }
306         }
307     }
308
309     /*-------------------------------------------------------------------------------------------
310      * Functions for interface DeviceManagerService
311      */
312
313     /**
314      * For each mounted device a mountpoint is created and this listener is called.
315      * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
316      * @param action provide action
317      * @param nNodeId id of the mountpoint
318      * @param nNode mountpoint contents
319      */
320     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
321
322         String mountPointNodeName = nNodeId.getValue();
323         LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
324
325         boolean preConditionMissing = false;
326         if (mountPointService == null) {
327             preConditionMissing = true;
328             LOG.warn("No mountservice available.");
329         }
330         if (!devicemanagerInitializationOk) {
331             preConditionMissing = true;
332             LOG.warn("Devicemanager initialization still pending.");
333         }
334         if (preConditionMissing) {
335             return;
336         }
337
338         if (networkElementRepresentations.containsKey(mountPointNodeName)) {
339             LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
340             return;
341         }
342
343         if (!isNetconfNodeMaster(nNode)) {
344             // Change Devicemonitor-status to connected ... for non master mountpoints.
345             deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
346             return;
347         }
348
349         InstanceIdentifier<Node> instanceIdentifier =
350                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountPointNodeName)));
351
352         Optional<MountPoint> optionalMountPoint = null;
353         int timeout = 10000;
354         while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent() && timeout > 0) {
355             LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
356             sleepMs(1000);
357             timeout -= 1000;
358         }
359
360         if (!optionalMountPoint.isPresent()) {
361             LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
362                     mountPointNodeName);
363             return;
364         }
365         // Mountpoint is present for sure
366         MountPoint mountPoint = optionalMountPoint.get();
367         //BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
368         LOG.info("Mountpoint with id: {} class {} toString {}", mountPoint.getIdentifier(), mountPoint.getClass().getName(), mountPoint);
369         Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
370
371         if (! optionalNetconfNodeDatabroker.isPresent()) {
372             LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
373             return;
374         }
375
376         DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
377         LOG.info("Master mountpoint {}", mountPointNodeName);
378         // getNodeInfoTest(dataBroker);
379
380         // create automatic empty maintenance entry into db before reading and listening for problems
381         this.maintenanceService.createIfNotExists(mountPointNodeName);
382
383         // Setup microwaveEventListener for Notificationservice
384
385         // MicrowaveEventListener microwaveEventListener = new
386         // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
387         // xmlMapper, databaseClientEvents);
388         ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName, dataBroker,
389                 webSocketService, databaseClientEvents, instanceIdentifier, netconfNodeDataBroker, dcaeProviderClient,
390                 aotsMProvider, maintenanceService, notificationDelayService);
391         networkElementRepresentations.put(mountPointNodeName, ne);
392         ne.doRegisterMicrowaveEventListener(mountPoint);
393
394         // Register netconf stream
395         registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
396
397         // -- Read data from NE
398         ne.initialReadFromNetworkElement();
399         ne.initSynchronizationExtension();
400
401
402         sendUpdateNotification(mountPointNodeName, nNode.getConnectionStatus());
403
404         if (aaiProviderClient != null) {
405             aaiProviderClient.onDeviceRegistered(mountPointNodeName);
406         }
407         // -- Register NE to performance manager
408         if (performanceManager != null) {
409             performanceManager.registration(mountPointNodeName, ne);
410         }
411
412         deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
413
414         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
415     }
416
417     /**
418      * Mountpoint created or existing. Managed device is actually disconnected from node/ mountpoint.
419      * Origin state: Connecting, Connected
420      * Target state: are UnableToConnect or Connecting
421      * @param action create or update
422      * @param nNodeId id of the mountpoint
423      * @param nNode mountpoint contents
424      */
425     public void enterNonConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
426         String mountPointNodeName = nNodeId.getValue();
427         ConnectionStatus csts = nNode.getConnectionStatus();
428         if (isNetconfNodeMaster(nNode)) {
429                 sendUpdateNotification(mountPointNodeName, csts);
430         }
431
432         // Handling if mountpoint exist. connected -> connecting/UnableToConnect
433         stopListenerOnNodeForConnectedState(mountPointNodeName);
434
435         deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
436
437     }
438
439     /**
440      * Mountpoint removed indication.
441      * @param nNodeId id of the mountpoint
442      */
443     public void removeMountpointState(NodeId nNodeId) {
444         String mountPointNodeName = nNodeId.getValue();
445         LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
446
447         stopListenerOnNodeForConnectedState(mountPointNodeName);
448         deviceMonitor.removeMountpointIndication(mountPointNodeName);
449         if (odlEventListener != null) {
450             odlEventListener.deRegistration(mountPointNodeName);
451         }
452     }
453
454     /**
455      * Do all tasks necessary to move from mountpoint state connected -> connecting
456      * @param mountPointNodeName provided
457      * @param ne representing the device connected to mountpoint
458      */
459     private void stopListenerOnNodeForConnectedState( String mountPointNodeName) {
460         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
461         if (ne != null) {
462             this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
463             int problems = ne.removeAllCurrentProblemsOfNode();
464             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
465             if (performanceManager != null) {
466                 performanceManager.deRegistration(mountPointNodeName);
467             }
468             if (aaiProviderClient != null) {
469                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
470             }
471         }
472     }
473
474     private void sendUpdateNotification(String mountPointNodeName, ConnectionStatus csts) {
475         LOG.info("enter Non ConnectedState for device :: Name : {} ConnectionStatus {}", mountPointNodeName, csts);
476         if (odlEventListener != null) {
477                 odlEventListener.updateRegistration(mountPointNodeName, ConnectionStatus.class.getSimpleName(), csts != null ? csts.getName() : "null");
478         }
479     }
480
481     /**
482      * Handle netconf/mountpoint changes
483      */
484     @Override
485     public void netconfChangeHandler(Action action, @Nullable ConnectionStatus csts, NodeId nodeId, NetconfNode nNode) {
486
487                 ClusteredConnectionStatus ccsts = nNode.getClusteredConnectionStatus();
488                 String nodeIdString = nodeId.getValue();
489                 if (action == Action.CREATE) {
490                 if (odlEventListener != null) {
491                         odlEventListener.registration(nodeIdString);
492                 }
493                 }
494                 boolean isCluster = akkaConfig == null && akkaConfig.isCluster();
495                 if (isCluster && ccsts == null) {
496                         LOG.debug("NETCONF Node {} {} does not provide cluster status. Stop execution.", nodeIdString, action);
497                 } else {
498                         switch (action) {
499                         case REMOVE:
500                                 removeMountpointState(nodeId); // Stop Monitor
501                                 break;
502
503                         case UPDATE:
504                         case CREATE:
505                                 if (csts != null) {
506                                         switch (csts) {
507                                         case Connected: {
508                                                 startListenerOnNodeForConnectedState(action, nodeId, nNode);
509                                                 break;
510                                         }
511                                         case UnableToConnect:
512                                         case Connecting: {
513                                                 enterNonConnectedState(action, nodeId, nNode);
514                                                 break;
515                                         }
516                                         }
517                                 } else {
518                                         LOG.debug("NETCONF Node handled with null status for action", action);
519                                 }
520                                 break;
521                         }
522                 }
523     }
524
525     /*-------------------------------------------------------------------------------------------
526      * Functions
527      */
528
529     public ArchiveCleanService getArchiveCleanService() {
530         return this.archiveCleanService;
531     }
532
533     public HtDatabaseEventsService getDatabaseClientEvents() {
534         return databaseClientEvents;
535     }
536
537     public IndexMwtnService getMwtnService() {
538         return mwtnService;
539     }
540
541     /**
542      * Async RPC Interface implementation
543      */
544     @Override
545     public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
546             throws IllegalStateException {
547
548         if (this.databaseClientEvents == null) {
549             throw new IllegalStateException("dbEvents service not instantiated");
550         }
551
552         if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
553             throw new IllegalStateException("A clear task is already active");
554         } else {
555
556             // Create list of mountpoints if input is empty, using the content in ES
557             if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
558                 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
559             }
560
561             // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
562             // DeviceManager
563             final List<String> nodeNamesHandled = new ArrayList<>();
564             for (String mountpointName : nodeNamesInput) {
565                 LOG.info("Work with mountpoint {}", mountpointName);
566
567                 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
568
569                     // SDN Controller related alarms
570                     // -- can not be recreated on all nodes in connected state
571                     // -- would result in a DCAE/AAI Notification
572                     // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
573                     LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
574                     // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
575                     // nodeNamesHandled.add(mountpointName);
576
577                 } else {
578
579                     if (mountPointService != null) {
580                         InstanceIdentifier<Node> instanceIdentifier =
581                                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
582                         Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
583
584                         if (!optionalMountPoint.isPresent()) {
585                             LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
586                             this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
587                             nodeNamesHandled.add(mountpointName);
588                         } else {
589                             if (networkElementRepresentations.containsKey(mountpointName)) {
590                                 LOG.info("At node known mountpoint {}", mountpointName);
591                                 nodeNamesHandled.add(mountpointName);
592                             } else {
593                                 LOG.info("At node unknown mountpoint {}", mountpointName);
594                             }
595                         }
596                     }
597                 }
598             }
599
600             // Force a sync
601             this.deviceMonitor.refreshAlarmsInDb();
602
603             threadDoClearCurrentFaultByNodename = new Thread(() -> {
604                 refreshCounter++;
605                 LOG.info("Start refresh mountpoint task {}", refreshCounter);
606                 // for(String nodeName:nodeNamesOutput) {
607                 for (String nodeName : nodeNamesHandled) {
608                     ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
609                     if (ne != null) {
610                         LOG.info("Refresh mountpoint {}", nodeName);
611                         ne.initialReadFromNetworkElement();
612                     } else {
613                         LOG.info("Unhandled mountpoint {}", nodeName);
614                     }
615                 }
616                 LOG.info("End refresh mountpoint task {}", refreshCounter);
617             });
618             threadDoClearCurrentFaultByNodename.start();
619             return nodeNamesHandled;
620         }
621     };
622
623     /**
624      * Indication if init() of devicemanager successfully done.
625      *
626      * @return true if init() was sucessfull. False if not done or not successfull.
627      */
628     public boolean isDevicemanagerInitializationOk() {
629         return this.devicemanagerInitializationOk;
630     }
631
632     /**
633      * Get initialization status of database.
634      *
635      * @return true if fully initialized false if not
636      */
637     public boolean isDatabaseInitializationFinished() {
638         return htDatabase == null ? false : htDatabase.getInitialized();
639     }
640
641     /*---------------------------------------------------------------------
642      * Private funtions
643      */
644
645     /**
646      * Do the stream creation for the device.
647      *
648      * @param mountPointNodeName
649      * @param mountPoint
650      */
651     private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
652
653         final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
654                 mountPoint.getService(RpcConsumerRegistry.class);
655         if (optionalRpcConsumerService.isPresent()) {
656             final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
657             final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
658             if (rpcService == null) {
659                 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
660             } else {
661                 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
662                         new CreateSubscriptionInputBuilder();
663                 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
664                 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
665                         mountPointNodeName);
666                 try {
667                     CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
668                     if (createSubscriptionInput == null) {
669                         LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
670                     } else {
671                         rpcService.createSubscription(createSubscriptionInput);
672                     }
673                 } catch (NullPointerException e) {
674                     LOG.warn("createSubscription failed");
675                 }
676             }
677         } else {
678             LOG.warn("No RpcConsumerRegistry avaialble.");
679         }
680
681     }
682
683     /**
684      * Get NE object
685      *
686      * @param mountpoint mount point name
687      * @return null or NE specific data
688      */
689     public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
690
691         return networkElementRepresentations.get(mountpoint);
692
693     }
694
695     /* -- LOG related functions -- */
696
697
698     private boolean isInClusterMode() {
699         return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
700     }
701
702     private String getClusterNetconfNodeName() {
703         return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
704     }
705
706     private boolean isNetconfNodeMaster(NetconfNode nnode) {
707         if (isInClusterMode()) {
708             LOG.debug("check if me is responsible for node");
709             String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
710                     : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
711             String myNodeName = getClusterNetconfNodeName();
712             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
713             if (!masterNodeName.equals(myNodeName)) {
714                 LOG.debug("netconf change but me is not master for this node");
715                 return false;
716             }
717         }
718         return true;
719     }
720
721
722     private void sleepMs(int milliseconds) {
723         try {
724             Thread.sleep(milliseconds);
725         } catch (InterruptedException e) {
726             LOG.debug("Interrupted sleep");
727             // Restore interrupted state...
728             Thread.currentThread().interrupt();
729         }
730     }
731
732 }