Fix malformed time stamp
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
19
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.archiveservice.ArchiveCleanService;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitor;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorEmptyImpl;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientDummyImpl;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
51 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
52 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
53 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
54 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
55 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
56 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
57 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
58 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
74 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
78 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
79
80 @SuppressWarnings("deprecation")
81 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
82
83     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
84     private static final String APPLICATION_NAME = "DeviceManager";
85     private static final String MYDBKEYNAMEBASE = "SDN-Controller";
86
87     // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
88     private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
89             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
90                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
91     @SuppressWarnings("unused")
92     private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
93     // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
94
95     private DataBroker dataBroker = null;
96     private MountPointService mountPointService = null;
97     private RpcProviderRegistry rpcProviderRegistry = null;
98     @SuppressWarnings("unused")
99     private NotificationPublishService notificationPublishService = null;
100     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
101
102     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
103             new ConcurrentHashMap<>();
104     private final ONFCoreNetworkElementRepresentation networkelementLock = ONFCoreNetworkElementFactory.getEmpty("NE-LOCK");
105     private WebSocketServiceClient webSocketService;
106     private HtDatabaseEventsService databaseClientEvents;
107     private ODLEventListener odlEventListener;
108     private NetconfChangeListener netconfChangeListener;
109     private DeviceManagerApiServiceImpl rpcApiService;
110     private @Nullable PerformanceManagerImpl performanceManager = null;
111     private ProviderClient dcaeProviderClient;
112     private ProviderClient aotsMProvider;
113     private @Nullable AaiProviderClient aaiProviderClient = null;
114     private @Nullable DeviceMonitor deviceMonitor = new DeviceMonitorEmptyImpl();
115     private IndexUpdateService updateService;
116     //private IndexConfigService configService; issue#1
117     private IndexMwtnService mwtnService;
118     private HtDatabaseNode htDatabase;
119     private Boolean devicemanagerInitializationOk = false;
120     private MaintenanceServiceImpl maintenanceService;
121     private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
122     private Thread threadDoClearCurrentFaultByNodename = null;
123     private int refreshCounter = 0;
124     private AkkaConfig akkaConfig;
125     private ArchiveCleanService archiveCleanService;
126     @SuppressWarnings("unused")
127         private ClusterSingletonServiceRegistration cssRegistration;
128
129     // Blueprint 1
130     public DeviceManagerImpl() {
131         LOG.info("Creating provider for {}", APPLICATION_NAME);
132     }
133
134     public void setDataBroker(DataBroker dataBroker) {
135         this.dataBroker = dataBroker;
136     }
137
138     public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
139         this.rpcProviderRegistry = rpcProviderRegistry;
140     }
141
142     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
143         this.notificationPublishService = notificationPublishService;
144     }
145
146     public void setMountPointService(MountPointService mountPointService) {
147         this.mountPointService = mountPointService;
148     }
149     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
150         this.clusterSingletonServiceProvider = clusterSingletonService;
151     }
152     public void init() {
153
154         LOG.info("Session Initiated start {}", APPLICATION_NAME);
155
156         // Start RPC Service
157         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
158         // Get configuration
159         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
160         try {
161             this.akkaConfig = AkkaConfig.load();
162             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
163         } catch (Exception e1) {
164             this.akkaConfig = null;
165             LOG.warn("problem loading akka.conf: " + e1.getMessage());
166         }
167         GeoConfig geoConfig = null;
168         if (akkaConfig != null && akkaConfig.isCluster()) {
169             LOG.info("cluster mode detected");
170             if (GeoConfig.fileExists()) {
171                 try {
172                     LOG.debug("try to load geoconfig");
173                     geoConfig = GeoConfig.load();
174                 } catch (Exception err) {
175                     LOG.warn("problem loading geoconfig: " + err.getMessage());
176                 }
177             } else {
178                 LOG.debug("no geoconfig file found");
179             }
180         } else {
181             LOG.info("single node mode detected");
182         }
183
184         this.notificationDelayService = new NotificationDelayService<>(config);
185
186         EsConfig dbConfig = config.getEs();
187         LOG.debug("esConfig=" + dbConfig.toString());
188         // Start database
189         htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
190         if (htDatabase == null) {
191             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
192         } else {
193             // init Database Values only if singleNode or clusterMember=1
194             if (akkaConfig == null || akkaConfig.isClusterAndFirstNode()) {
195                 // Create DB index if not existing and if database is running
196                 try {
197                     //this.configService = new IndexConfigService(htDatabase);issue#1
198                     this.mwtnService = new IndexMwtnService(htDatabase);
199                 } catch (Exception e) {
200                     LOG.warn("Can not start ES access clients to provide database index config, mwtn. ", e);
201                 }
202             }
203             // start service for device maintenance service
204             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
205
206             // Websockets
207             try {
208                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
209             } catch (Exception e) {
210                 LOG.error("Can not start websocket service. Loading mock class.", e);
211                 this.webSocketService = new WebSocketServiceClientDummyImpl();
212             }
213             // DCAE
214             this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
215
216             this.aaiProviderClient = new AaiProviderClient(config, this);
217             // EM
218             EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
219
220             if (emConfig == null) {
221                 LOG.warn("No configuration available. Don't start event manager");
222             } else {
223                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
224
225                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
226
227                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
228                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
229             }
230             this.archiveCleanService = new ArchiveCleanService(config, databaseClientEvents, mwtnService);
231             this.cssRegistration = this.clusterSingletonServiceProvider.registerClusterSingletonService(this.archiveCleanService);
232             // PM
233             PmConfig configurationPM = config.getPm();
234             LOG.info("Performance manager configuration: {}", configurationPM);
235             if (!configurationPM.isPerformanceManagerEnabled()) {
236
237                 LOG.info("No configuration available. Don't start performance manager");
238             } else {
239                 @Nullable
240                 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
241                 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
242                 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
243             }
244
245             // DUS (Database update service)
246             LOG.debug("start db update service");
247             this.updateService =
248                     new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
249             this.updateService.start();
250
251             // RPC Service for specific services
252             this.rpcApiService.setMaintenanceService(this.maintenanceService);
253             this.rpcApiService.setResyncListener(this);
254             // DM
255             // DeviceMonitor has to be available before netconfSubscriptionManager is
256             // configured
257             LOG.debug("start DeviceMonitor Service");
258             this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener, config);
259
260             // netconfSubscriptionManager should be the last one because this is a callback
261             // service
262             LOG.debug("start NetconfSubscriptionManager Service");
263             // this.netconfSubscriptionManager = new
264             // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
265             // this.netconfSubscriptionManager.register();
266             this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
267             this.netconfChangeListener.register();
268
269             this.devicemanagerInitializationOk = true;
270         }
271         LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
272     }
273
274     @Override
275     public void close() throws Exception {
276         LOG.info("DeviceManagerImpl closing ...");
277
278         close(performanceManager);
279         close(dcaeProviderClient);
280         close(aaiProviderClient);
281         close(aotsMProvider);
282         close(deviceMonitor);
283         //close(updateService, configService, mwtnService); issue#1
284         close(updateService, mwtnService);
285         close(htDatabase);
286         close(netconfChangeListener);
287         close(maintenanceService);
288         close(rpcApiService);
289         close(notificationDelayService);
290         close(archiveCleanService);
291         LOG.info("DeviceManagerImpl closing done");
292     }
293
294
295     /**
296      * Used to close all Services, that should support AutoCloseable Pattern
297      *
298      * @param toClose
299      * @throws Exception
300      */
301     private void close(AutoCloseable... toCloseList) throws Exception {
302         for (AutoCloseable element : toCloseList) {
303             if (element != null) {
304                 element.close();
305             }
306         }
307     }
308
309     /*-------------------------------------------------------------------------------------------
310      * Functions for interface DeviceManagerService
311      */
312
313     /**
314      * For each mounted device a mountpoint is created and this listener is called.
315      * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
316      * @param action provide action
317      * @param nNodeId id of the mountpoint
318      * @param nNode mountpoint contents
319      */
320     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
321
322         String mountPointNodeName = nNodeId.getValue();
323         LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
324
325         boolean preConditionMissing = false;
326         if (mountPointService == null) {
327             preConditionMissing = true;
328             LOG.warn("No mountservice available.");
329         }
330         if (!devicemanagerInitializationOk) {
331             preConditionMissing = true;
332             LOG.warn("Devicemanager initialization still pending.");
333         }
334         if (preConditionMissing) {
335             return;
336         }
337
338         if (!isNetconfNodeMaster(nNode)) {
339             // Change Devicemonitor-status to connected ... for non master mountpoints.
340             deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
341                 } else {
342
343                         InstanceIdentifier<Node> instanceIdentifier = NETCONF_TOPO_IID.child(Node.class,
344                                         new NodeKey(new NodeId(mountPointNodeName)));
345
346                         Optional<MountPoint> optionalMountPoint = null;
347                         int timeout = 10000;
348                         while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent()
349                                         && timeout > 0) {
350                                 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
351                                 sleepMs(1000);
352                                 timeout -= 1000;
353                         }
354
355                         if (!optionalMountPoint.isPresent()) {
356                                 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
357                                                 mountPointNodeName);
358                         } else {
359                                 // Mountpoint is present for sure
360                                 MountPoint mountPoint = optionalMountPoint.get();
361                                 // BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
362                                 LOG.info("Mountpoint with id: {} class {} toString {}", mountPoint.getIdentifier(),
363                                                 mountPoint.getClass().getName(), mountPoint);
364                                 Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
365
366                                 if (!optionalNetconfNodeDatabroker.isPresent()) {
367                                         LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
368                                 } else {
369
370                                         // It is master for mountpoint and all data are available.
371                                         // Make sure that specific mountPointNodeName is handled only once.
372                                         // be aware that startListenerOnNodeForConnectedState could be called multiple
373                                         // times for same mountPointNodeName.
374                                         // networkElementRepresentations contains handled NEs at master node.
375
376                                         synchronized (networkelementLock) {
377                                                 if (networkElementRepresentations.containsKey(mountPointNodeName)) {
378                                                         LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
379                                                         return;
380                                                 } else {
381                                                         ONFCoreNetworkElementRepresentation result = networkElementRepresentations.put(mountPointNodeName,
382                                                                         networkelementLock);
383                                                         if (result != null) {
384                                                                 LOG.info("Expected null value was not provided, but {}", result.getMountPointNodeName());
385                                                         }
386                                                 }
387                                         }
388
389                                         DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
390                                         LOG.info("Master mountpoint {}", mountPointNodeName);
391                                         // getNodeInfoTest(dataBroker);
392
393                                         // create automatic empty maintenance entry into db before reading and listening
394                                         // for problems
395                                         this.maintenanceService.createIfNotExists(mountPointNodeName);
396
397                                         // Setup microwaveEventListener for Notificationservice
398
399                                         // MicrowaveEventListener microwaveEventListener = new
400                                         // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
401                                         // xmlMapper, databaseClientEvents);
402                                         ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName,
403                                                         dataBroker, webSocketService, databaseClientEvents, instanceIdentifier,
404                                                         netconfNodeDataBroker, dcaeProviderClient, aotsMProvider, maintenanceService,
405                                                         notificationDelayService);
406
407                                         synchronized (networkelementLock) {
408                                                 ONFCoreNetworkElementRepresentation result = networkElementRepresentations
409                                                                 .put(mountPointNodeName, ne);
410                                                 if (result != networkelementLock) {
411                                                         LOG.info("NE list does not provide lock as epxected, but {}.",
412                                                                         result.getMountPointNodeName());
413                                                 }
414                                         }
415                                         ne.doRegisterMicrowaveEventListener(mountPoint);
416
417                                         // Register netconf stream
418                                         registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
419
420                                         // -- Read data from NE
421                                         ne.initialReadFromNetworkElement();
422
423                                         sendUpdateNotification(mountPointNodeName, nNode.getConnectionStatus());
424
425                                         if (aaiProviderClient != null) {
426                                                 aaiProviderClient.onDeviceRegistered(mountPointNodeName);
427                                         }
428                                         // -- Register NE to performance manager
429                                         if (performanceManager != null) {
430                                                 performanceManager.registration(mountPointNodeName, ne);
431                                         }
432
433                                         deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
434
435                                         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
436                                 }
437                         }
438                 }
439         }
440
441     /**
442      * Mountpoint created or existing. Managed device is actually disconnected from node/ mountpoint.
443      * Origin state: Connecting, Connected
444      * Target state: are UnableToConnect or Connecting
445      * @param action create or update
446      * @param nNodeId id of the mountpoint
447      * @param nNode mountpoint contents
448      */
449     public void enterNonConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
450         String mountPointNodeName = nNodeId.getValue();
451         ConnectionStatus csts = nNode.getConnectionStatus();
452         if (isNetconfNodeMaster(nNode)) {
453                 sendUpdateNotification(mountPointNodeName, csts);
454         }
455
456         // Handling if mountpoint exist. connected -> connecting/UnableToConnect
457         stopListenerOnNodeForConnectedState(mountPointNodeName);
458
459         deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
460
461     }
462
463     /**
464      * Mountpoint removed indication.
465      * @param nNodeId id of the mountpoint
466      */
467     public void removeMountpointState(NodeId nNodeId) {
468         String mountPointNodeName = nNodeId.getValue();
469         LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
470
471         stopListenerOnNodeForConnectedState(mountPointNodeName);
472         deviceMonitor.removeMountpointIndication(mountPointNodeName);
473         if (odlEventListener != null) {
474             odlEventListener.deRegistration(mountPointNodeName);
475         }
476     }
477
478     /**
479      * Do all tasks necessary to move from mountpoint state connected -> connecting
480      * @param mountPointNodeName provided
481      * @param ne representing the device connected to mountpoint
482      */
483     private void stopListenerOnNodeForConnectedState( String mountPointNodeName) {
484         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
485         if (ne != null) {
486             this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
487             int problems = ne.removeAllCurrentProblemsOfNode();
488             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
489             if (performanceManager != null) {
490                 performanceManager.deRegistration(mountPointNodeName);
491             }
492             if (aaiProviderClient != null) {
493                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
494             }
495         }
496     }
497
498     private void sendUpdateNotification(String mountPointNodeName, ConnectionStatus csts) {
499         LOG.info("enter Non ConnectedState for device :: Name : {} ConnectionStatus {}", mountPointNodeName, csts);
500         if (odlEventListener != null) {
501                 odlEventListener.updateRegistration(mountPointNodeName, ConnectionStatus.class.getSimpleName(), csts != null ? csts.getName() : "null");
502         }
503     }
504
505     /**
506      * Handle netconf/mountpoint changes
507      */
508     @Override
509     public void netconfChangeHandler(Action action, @Nullable ConnectionStatus csts, NodeId nodeId, NetconfNode nNode) {
510
511                 ClusteredConnectionStatus ccsts = nNode.getClusteredConnectionStatus();
512                 String nodeIdString = nodeId.getValue();
513                 if (action == Action.CREATE) {
514                 if (odlEventListener != null) {
515                         odlEventListener.registration(nodeIdString);
516                 }
517                 }
518                 boolean isCluster = akkaConfig == null && akkaConfig.isCluster();
519                 if (isCluster && ccsts == null) {
520                         LOG.debug("NETCONF Node {} {} does not provide cluster status. Stop execution.", nodeIdString, action);
521                 } else {
522                         switch (action) {
523                         case REMOVE:
524                                 removeMountpointState(nodeId); // Stop Monitor
525                                 break;
526
527                         case UPDATE:
528                         case CREATE:
529                                 if (csts != null) {
530                                         switch (csts) {
531                                         case Connected: {
532                                                 startListenerOnNodeForConnectedState(action, nodeId, nNode);
533                                                 break;
534                                         }
535                                         case UnableToConnect:
536                                         case Connecting: {
537                                                 enterNonConnectedState(action, nodeId, nNode);
538                                                 break;
539                                         }
540                                         }
541                                 } else {
542                                         LOG.debug("NETCONF Node handled with null status for action", action);
543                                 }
544                                 break;
545                         }
546                 }
547     }
548
549     /*-------------------------------------------------------------------------------------------
550      * Functions
551      */
552
553     public ArchiveCleanService getArchiveCleanService() {
554         return this.archiveCleanService;
555     }
556
557     public HtDatabaseEventsService getDatabaseClientEvents() {
558         return databaseClientEvents;
559     }
560
561     public IndexMwtnService getMwtnService() {
562         return mwtnService;
563     }
564
565     /**
566      * Async RPC Interface implementation
567      */
568     @Override
569     public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
570             throws IllegalStateException {
571
572         if (this.databaseClientEvents == null) {
573             throw new IllegalStateException("dbEvents service not instantiated");
574         }
575
576         if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
577             throw new IllegalStateException("A clear task is already active");
578         } else {
579
580             // Create list of mountpoints if input is empty, using the content in ES
581             if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
582                 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
583             }
584
585             // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
586             // DeviceManager
587             final List<String> nodeNamesHandled = new ArrayList<>();
588             for (String mountpointName : nodeNamesInput) {
589                 LOG.info("Work with mountpoint {}", mountpointName);
590
591                 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
592
593                     // SDN Controller related alarms
594                     // -- can not be recreated on all nodes in connected state
595                     // -- would result in a DCAE/AAI Notification
596                     // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
597                     LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
598                     // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
599                     // nodeNamesHandled.add(mountpointName);
600
601                 } else {
602
603                     if (mountPointService != null) {
604                         InstanceIdentifier<Node> instanceIdentifier =
605                                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
606                         Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
607
608                         if (!optionalMountPoint.isPresent()) {
609                             LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
610                             this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
611                             nodeNamesHandled.add(mountpointName);
612                         } else {
613                             if (networkElementRepresentations.containsKey(mountpointName)) {
614                                 LOG.info("At node known mountpoint {}", mountpointName);
615                                 nodeNamesHandled.add(mountpointName);
616                             } else {
617                                 LOG.info("At node unknown mountpoint {}", mountpointName);
618                             }
619                         }
620                     }
621                 }
622             }
623
624             // Force a sync
625             this.deviceMonitor.refreshAlarmsInDb();
626
627             threadDoClearCurrentFaultByNodename = new Thread(() -> {
628                 refreshCounter++;
629                 LOG.info("Start refresh mountpoint task {}", refreshCounter);
630                 // for(String nodeName:nodeNamesOutput) {
631                 for (String nodeName : nodeNamesHandled) {
632                     ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
633                     if (ne != null) {
634                         LOG.info("Refresh mountpoint {}", nodeName);
635                         ne.initialReadFromNetworkElement();
636                     } else {
637                         LOG.info("Unhandled mountpoint {}", nodeName);
638                     }
639                 }
640                 LOG.info("End refresh mountpoint task {}", refreshCounter);
641             });
642             threadDoClearCurrentFaultByNodename.start();
643             return nodeNamesHandled;
644         }
645     };
646
647     /**
648      * Indication if init() of devicemanager successfully done.
649      *
650      * @return true if init() was sucessfull. False if not done or not successfull.
651      */
652     public boolean isDevicemanagerInitializationOk() {
653         return this.devicemanagerInitializationOk;
654     }
655
656     /**
657      * Get initialization status of database.
658      *
659      * @return true if fully initialized false if not
660      */
661     public boolean isDatabaseInitializationFinished() {
662         return htDatabase == null ? false : htDatabase.getInitialized();
663     }
664
665     /*---------------------------------------------------------------------
666      * Private funtions
667      */
668
669     /**
670      * Do the stream creation for the device.
671      *
672      * @param mountPointNodeName
673      * @param mountPoint
674      */
675     private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
676
677         final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
678                 mountPoint.getService(RpcConsumerRegistry.class);
679         if (optionalRpcConsumerService.isPresent()) {
680             final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
681             final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
682             if (rpcService == null) {
683                 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
684             } else {
685                 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
686                         new CreateSubscriptionInputBuilder();
687                 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
688                 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
689                         mountPointNodeName);
690                 try {
691                     CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
692                     if (createSubscriptionInput == null) {
693                         LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
694                     } else {
695                         rpcService.createSubscription(createSubscriptionInput);
696                     }
697                 } catch (NullPointerException e) {
698                     LOG.warn("createSubscription failed");
699                 }
700             }
701         } else {
702             LOG.warn("No RpcConsumerRegistry avaialble.");
703         }
704
705     }
706
707     /**
708      * Get NE object
709      *
710      * @param mountpoint mount point name
711      * @return null or NE specific data
712      */
713     public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
714
715         return networkElementRepresentations.get(mountpoint);
716
717     }
718
719     /* -- LOG related functions -- */
720
721
722     private boolean isInClusterMode() {
723         return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
724     }
725
726     private String getClusterNetconfNodeName() {
727         return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
728     }
729
730     private boolean isNetconfNodeMaster(NetconfNode nnode) {
731         if (isInClusterMode()) {
732             LOG.debug("check if me is responsible for node");
733             String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
734                     : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
735             String myNodeName = getClusterNetconfNodeName();
736             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
737             if (!masterNodeName.equals(myNodeName)) {
738                 LOG.debug("netconf change but me is not master for this node");
739                 return false;
740             }
741         }
742         return true;
743     }
744
745
746     private void sleepMs(int milliseconds) {
747         try {
748             Thread.sleep(milliseconds);
749         } catch (InterruptedException e) {
750             LOG.debug("Interrupted sleep");
751             // Restore interrupted state...
752             Thread.currentThread().interrupt();
753         }
754     }
755
756 }