SDNR Harden concurrent stream registration
[ccsdk/features.git] / sdnr / wt / devicemanager / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / devicemanager / impl / DeviceManagerImpl.java
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
19
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.archiveservice.ArchiveCleanService;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitor;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorEmptyImpl;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientDummyImpl;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
51 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
52 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
53 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
54 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
55 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
56 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
57 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
58 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
79 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
80
81 @SuppressWarnings("deprecation")
82 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
83
84     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
85     private static final String APPLICATION_NAME = "DeviceManager";
86     private static final String MYDBKEYNAMEBASE = "SDN-Controller";
87
88     // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
89     private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
90             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
91                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
92     @SuppressWarnings("unused")
93     private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
94     // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
95
96     private DataBroker dataBroker = null;
97     private MountPointService mountPointService = null;
98     private RpcProviderRegistry rpcProviderRegistry = null;
99     @SuppressWarnings("unused")
100     private NotificationPublishService notificationPublishService = null;
101     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
102
103     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
104             new ConcurrentHashMap<>();
105     private final ONFCoreNetworkElementRepresentation networkelementLock = ONFCoreNetworkElementFactory.getEmpty("NE-LOCK");
106     private WebSocketServiceClient webSocketService;
107     private HtDatabaseEventsService databaseClientEvents;
108     private ODLEventListener odlEventListener;
109     private NetconfChangeListener netconfChangeListener;
110     private DeviceManagerApiServiceImpl rpcApiService;
111     private @Nullable PerformanceManagerImpl performanceManager = null;
112     private ProviderClient dcaeProviderClient;
113     private ProviderClient aotsMProvider;
114     private @Nullable AaiProviderClient aaiProviderClient = null;
115     private @Nullable DeviceMonitor deviceMonitor = new DeviceMonitorEmptyImpl();
116     private IndexUpdateService updateService;
117     private IndexConfigService configService;
118     private IndexMwtnService mwtnService;
119     private HtDatabaseNode htDatabase;
120     private Boolean devicemanagerInitializationOk = false;
121     private MaintenanceServiceImpl maintenanceService;
122     private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
123     private Thread threadDoClearCurrentFaultByNodename = null;
124     private int refreshCounter = 0;
125     private AkkaConfig akkaConfig;
126     private ArchiveCleanService archiveCleanService;
127     @SuppressWarnings("unused")
128         private ClusterSingletonServiceRegistration cssRegistration;
129
130     // Blueprint 1
131     public DeviceManagerImpl() {
132         LOG.info("Creating provider for {}", APPLICATION_NAME);
133     }
134
135     public void setDataBroker(DataBroker dataBroker) {
136         this.dataBroker = dataBroker;
137     }
138
139     public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
140         this.rpcProviderRegistry = rpcProviderRegistry;
141     }
142
143     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
144         this.notificationPublishService = notificationPublishService;
145     }
146
147     public void setMountPointService(MountPointService mountPointService) {
148         this.mountPointService = mountPointService;
149     }
150     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
151         this.clusterSingletonServiceProvider = clusterSingletonService;
152     }
153     public void init() {
154
155         LOG.info("Session Initiated start {}", APPLICATION_NAME);
156
157         // Start RPC Service
158         this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
159         // Get configuration
160         HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
161         try {
162             this.akkaConfig = AkkaConfig.load();
163             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
164         } catch (Exception e1) {
165             this.akkaConfig = null;
166             LOG.warn("problem loading akka.conf: " + e1.getMessage());
167         }
168         GeoConfig geoConfig = null;
169         if (akkaConfig != null && akkaConfig.isCluster()) {
170             LOG.info("cluster mode detected");
171             if (GeoConfig.fileExists()) {
172                 try {
173                     LOG.debug("try to load geoconfig");
174                     geoConfig = GeoConfig.load();
175                 } catch (Exception err) {
176                     LOG.warn("problem loading geoconfig: " + err.getMessage());
177                 }
178             } else {
179                 LOG.debug("no geoconfig file found");
180             }
181         } else {
182             LOG.info("single node mode detected");
183         }
184
185         this.notificationDelayService = new NotificationDelayService<>(config);
186
187         EsConfig dbConfig = config.getEs();
188         LOG.debug("esConfig=" + dbConfig.toString());
189         // Start database
190         htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
191         if (htDatabase == null) {
192             LOG.error("Can only run with local database. Stop initialization of devicemanager.");
193         } else {
194             // init Database Values only if singleNode or clusterMember=1
195             if (akkaConfig == null || akkaConfig.isClusterAndFirstNode()) {
196                 // Create DB index if not existing and if database is running
197                 try {
198                     this.configService = new IndexConfigService(htDatabase);
199                     this.mwtnService = new IndexMwtnService(htDatabase);
200                 } catch (Exception e) {
201                     LOG.warn("Can not start ES access clients to provide database index config, mwtn. ", e);
202                 }
203             }
204             // start service for device maintenance service
205             this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
206
207             // Websockets
208             try {
209                 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
210             } catch (Exception e) {
211                 LOG.error("Can not start websocket service. Loading mock class.", e);
212                 this.webSocketService = new WebSocketServiceClientDummyImpl();
213             }
214             // DCAE
215             this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
216
217             this.aaiProviderClient = new AaiProviderClient(config, this);
218             // EM
219             EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
220
221             if (emConfig == null) {
222                 LOG.warn("No configuration available. Don't start event manager");
223             } else {
224                 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
225
226                 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
227
228                 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
229                         databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
230             }
231             this.archiveCleanService = new ArchiveCleanService(config, databaseClientEvents, mwtnService);
232             this.cssRegistration = this.clusterSingletonServiceProvider.registerClusterSingletonService(this.archiveCleanService);
233             // PM
234             PmConfig configurationPM = config.getPm();
235             LOG.info("Performance manager configuration: {}", configurationPM);
236             if (!configurationPM.isPerformanceManagerEnabled()) {
237
238                 LOG.info("No configuration available. Don't start performance manager");
239             } else {
240                 @Nullable
241                 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
242                 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
243                 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
244             }
245
246             // DUS (Database update service)
247             LOG.debug("start db update service");
248             this.updateService =
249                     new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
250             this.updateService.start();
251
252             // RPC Service for specific services
253             this.rpcApiService.setMaintenanceService(this.maintenanceService);
254             this.rpcApiService.setResyncListener(this);
255             // DM
256             // DeviceMonitor has to be available before netconfSubscriptionManager is
257             // configured
258             LOG.debug("start DeviceMonitor Service");
259             this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener, config);
260
261             // netconfSubscriptionManager should be the last one because this is a callback
262             // service
263             LOG.debug("start NetconfSubscriptionManager Service");
264             // this.netconfSubscriptionManager = new
265             // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
266             // this.netconfSubscriptionManager.register();
267             this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
268             this.netconfChangeListener.register();
269
270             this.devicemanagerInitializationOk = true;
271         }
272         LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
273     }
274
275     @Override
276     public void close() throws Exception {
277         LOG.info("DeviceManagerImpl closing ...");
278
279         close(performanceManager);
280         close(dcaeProviderClient);
281         close(aaiProviderClient);
282         close(aotsMProvider);
283         close(deviceMonitor);
284         close(updateService, configService, mwtnService);
285         close(htDatabase);
286         close(netconfChangeListener);
287         close(maintenanceService);
288         close(rpcApiService);
289         close(notificationDelayService);
290         close(archiveCleanService);
291         LOG.info("DeviceManagerImpl closing done");
292     }
293
294
295     /**
296      * Used to close all Services, that should support AutoCloseable Pattern
297      *
298      * @param toClose
299      * @throws Exception
300      */
301     private void close(AutoCloseable... toCloseList) throws Exception {
302         for (AutoCloseable element : toCloseList) {
303             if (element != null) {
304                 element.close();
305             }
306         }
307     }
308
309     /*-------------------------------------------------------------------------------------------
310      * Functions for interface DeviceManagerService
311      */
312
313     /**
314      * For each mounted device a mountpoint is created and this listener is called.
315      * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
316      * @param action provide action
317      * @param nNodeId id of the mountpoint
318      * @param nNode mountpoint contents
319      */
320     public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
321
322         String mountPointNodeName = nNodeId.getValue();
323         LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
324
325         boolean preConditionMissing = false;
326         if (mountPointService == null) {
327             preConditionMissing = true;
328             LOG.warn("No mountservice available.");
329         }
330         if (!devicemanagerInitializationOk) {
331             preConditionMissing = true;
332             LOG.warn("Devicemanager initialization still pending.");
333         }
334         if (preConditionMissing) {
335             return;
336         }
337
338         if (!isNetconfNodeMaster(nNode)) {
339             // Change Devicemonitor-status to connected ... for non master mountpoints.
340             deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
341                 } else {
342
343                         InstanceIdentifier<Node> instanceIdentifier = NETCONF_TOPO_IID.child(Node.class,
344                                         new NodeKey(new NodeId(mountPointNodeName)));
345
346                         Optional<MountPoint> optionalMountPoint = null;
347                         int timeout = 10000;
348                         while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent()
349                                         && timeout > 0) {
350                                 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
351                                 sleepMs(1000);
352                                 timeout -= 1000;
353                         }
354
355                         if (!optionalMountPoint.isPresent()) {
356                                 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
357                                                 mountPointNodeName);
358                         } else {
359                                 // Mountpoint is present for sure
360                                 MountPoint mountPoint = optionalMountPoint.get();
361                                 // BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
362                                 LOG.info("Mountpoint with id: {} class {} toString {}", mountPoint.getIdentifier(),
363                                                 mountPoint.getClass().getName(), mountPoint);
364                                 Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
365
366                                 if (!optionalNetconfNodeDatabroker.isPresent()) {
367                                         LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
368                                 } else {
369
370                                         // It is master for mountpoint and all data are available.
371                                         // Make sure that specific mountPointNodeName is handled only once.
372                                         // be aware that startListenerOnNodeForConnectedState could be called multiple
373                                         // times for same mountPointNodeName.
374                                         // networkElementRepresentations contains handled NEs at master node.
375
376                                         synchronized (networkelementLock) {
377                                                 if (networkElementRepresentations.containsKey(mountPointNodeName)) {
378                                                         LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
379                                                         return;
380                                                 } else {
381                                                         ONFCoreNetworkElementRepresentation result = networkElementRepresentations.put(mountPointNodeName,
382                                                                         networkelementLock);
383                                                         if (result != null) {
384                                                                 LOG.info("Expected null value was not provided, but {}", result.getMountPointNodeName());
385                                                         }
386                                                 }
387                                         }
388
389                                         DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
390                                         LOG.info("Master mountpoint {}", mountPointNodeName);
391                                         // getNodeInfoTest(dataBroker);
392
393                                         // create automatic empty maintenance entry into db before reading and listening
394                                         // for problems
395                                         this.maintenanceService.createIfNotExists(mountPointNodeName);
396
397                                         // Setup microwaveEventListener for Notificationservice
398
399                                         // MicrowaveEventListener microwaveEventListener = new
400                                         // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
401                                         // xmlMapper, databaseClientEvents);
402                                         ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName,
403                                                         dataBroker, webSocketService, databaseClientEvents, instanceIdentifier,
404                                                         netconfNodeDataBroker, dcaeProviderClient, aotsMProvider, maintenanceService,
405                                                         notificationDelayService);
406
407                                         synchronized (networkelementLock) {
408                                                 ONFCoreNetworkElementRepresentation result = networkElementRepresentations
409                                                                 .put(mountPointNodeName, ne);
410                                                 if (result != networkelementLock) {
411                                                         LOG.info("NE list does not provide lock as epxected, but {}.",
412                                                                         result.getMountPointNodeName());
413                                                 }
414                                         }
415                                         ne.doRegisterMicrowaveEventListener(mountPoint);
416
417                                         // Register netconf stream
418                                         registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
419
420                                         // -- Read data from NE
421                                         ne.initialReadFromNetworkElement();
422                                         ne.initSynchronizationExtension();
423
424                                         sendUpdateNotification(mountPointNodeName, nNode.getConnectionStatus());
425
426                                         if (aaiProviderClient != null) {
427                                                 aaiProviderClient.onDeviceRegistered(mountPointNodeName);
428                                         }
429                                         // -- Register NE to performance manager
430                                         if (performanceManager != null) {
431                                                 performanceManager.registration(mountPointNodeName, ne);
432                                         }
433
434                                         deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
435
436                                         LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
437                                 }
438                         }
439                 }
440         }
441
442     /**
443      * Mountpoint created or existing. Managed device is actually disconnected from node/ mountpoint.
444      * Origin state: Connecting, Connected
445      * Target state: are UnableToConnect or Connecting
446      * @param action create or update
447      * @param nNodeId id of the mountpoint
448      * @param nNode mountpoint contents
449      */
450     public void enterNonConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
451         String mountPointNodeName = nNodeId.getValue();
452         ConnectionStatus csts = nNode.getConnectionStatus();
453         if (isNetconfNodeMaster(nNode)) {
454                 sendUpdateNotification(mountPointNodeName, csts);
455         }
456
457         // Handling if mountpoint exist. connected -> connecting/UnableToConnect
458         stopListenerOnNodeForConnectedState(mountPointNodeName);
459
460         deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
461
462     }
463
464     /**
465      * Mountpoint removed indication.
466      * @param nNodeId id of the mountpoint
467      */
468     public void removeMountpointState(NodeId nNodeId) {
469         String mountPointNodeName = nNodeId.getValue();
470         LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
471
472         stopListenerOnNodeForConnectedState(mountPointNodeName);
473         deviceMonitor.removeMountpointIndication(mountPointNodeName);
474         if (odlEventListener != null) {
475             odlEventListener.deRegistration(mountPointNodeName);
476         }
477     }
478
479     /**
480      * Do all tasks necessary to move from mountpoint state connected -> connecting
481      * @param mountPointNodeName provided
482      * @param ne representing the device connected to mountpoint
483      */
484     private void stopListenerOnNodeForConnectedState( String mountPointNodeName) {
485         ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
486         if (ne != null) {
487             this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
488             int problems = ne.removeAllCurrentProblemsOfNode();
489             LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
490             if (performanceManager != null) {
491                 performanceManager.deRegistration(mountPointNodeName);
492             }
493             if (aaiProviderClient != null) {
494                 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
495             }
496         }
497     }
498
499     private void sendUpdateNotification(String mountPointNodeName, ConnectionStatus csts) {
500         LOG.info("enter Non ConnectedState for device :: Name : {} ConnectionStatus {}", mountPointNodeName, csts);
501         if (odlEventListener != null) {
502                 odlEventListener.updateRegistration(mountPointNodeName, ConnectionStatus.class.getSimpleName(), csts != null ? csts.getName() : "null");
503         }
504     }
505
506     /**
507      * Handle netconf/mountpoint changes
508      */
509     @Override
510     public void netconfChangeHandler(Action action, @Nullable ConnectionStatus csts, NodeId nodeId, NetconfNode nNode) {
511
512                 ClusteredConnectionStatus ccsts = nNode.getClusteredConnectionStatus();
513                 String nodeIdString = nodeId.getValue();
514                 if (action == Action.CREATE) {
515                 if (odlEventListener != null) {
516                         odlEventListener.registration(nodeIdString);
517                 }
518                 }
519                 boolean isCluster = akkaConfig == null && akkaConfig.isCluster();
520                 if (isCluster && ccsts == null) {
521                         LOG.debug("NETCONF Node {} {} does not provide cluster status. Stop execution.", nodeIdString, action);
522                 } else {
523                         switch (action) {
524                         case REMOVE:
525                                 removeMountpointState(nodeId); // Stop Monitor
526                                 break;
527
528                         case UPDATE:
529                         case CREATE:
530                                 if (csts != null) {
531                                         switch (csts) {
532                                         case Connected: {
533                                                 startListenerOnNodeForConnectedState(action, nodeId, nNode);
534                                                 break;
535                                         }
536                                         case UnableToConnect:
537                                         case Connecting: {
538                                                 enterNonConnectedState(action, nodeId, nNode);
539                                                 break;
540                                         }
541                                         }
542                                 } else {
543                                         LOG.debug("NETCONF Node handled with null status for action", action);
544                                 }
545                                 break;
546                         }
547                 }
548     }
549
550     /*-------------------------------------------------------------------------------------------
551      * Functions
552      */
553
554     public ArchiveCleanService getArchiveCleanService() {
555         return this.archiveCleanService;
556     }
557
558     public HtDatabaseEventsService getDatabaseClientEvents() {
559         return databaseClientEvents;
560     }
561
562     public IndexMwtnService getMwtnService() {
563         return mwtnService;
564     }
565
566     /**
567      * Async RPC Interface implementation
568      */
569     @Override
570     public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
571             throws IllegalStateException {
572
573         if (this.databaseClientEvents == null) {
574             throw new IllegalStateException("dbEvents service not instantiated");
575         }
576
577         if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
578             throw new IllegalStateException("A clear task is already active");
579         } else {
580
581             // Create list of mountpoints if input is empty, using the content in ES
582             if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
583                 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
584             }
585
586             // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
587             // DeviceManager
588             final List<String> nodeNamesHandled = new ArrayList<>();
589             for (String mountpointName : nodeNamesInput) {
590                 LOG.info("Work with mountpoint {}", mountpointName);
591
592                 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
593
594                     // SDN Controller related alarms
595                     // -- can not be recreated on all nodes in connected state
596                     // -- would result in a DCAE/AAI Notification
597                     // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
598                     LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
599                     // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
600                     // nodeNamesHandled.add(mountpointName);
601
602                 } else {
603
604                     if (mountPointService != null) {
605                         InstanceIdentifier<Node> instanceIdentifier =
606                                 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
607                         Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
608
609                         if (!optionalMountPoint.isPresent()) {
610                             LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
611                             this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
612                             nodeNamesHandled.add(mountpointName);
613                         } else {
614                             if (networkElementRepresentations.containsKey(mountpointName)) {
615                                 LOG.info("At node known mountpoint {}", mountpointName);
616                                 nodeNamesHandled.add(mountpointName);
617                             } else {
618                                 LOG.info("At node unknown mountpoint {}", mountpointName);
619                             }
620                         }
621                     }
622                 }
623             }
624
625             // Force a sync
626             this.deviceMonitor.refreshAlarmsInDb();
627
628             threadDoClearCurrentFaultByNodename = new Thread(() -> {
629                 refreshCounter++;
630                 LOG.info("Start refresh mountpoint task {}", refreshCounter);
631                 // for(String nodeName:nodeNamesOutput) {
632                 for (String nodeName : nodeNamesHandled) {
633                     ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
634                     if (ne != null) {
635                         LOG.info("Refresh mountpoint {}", nodeName);
636                         ne.initialReadFromNetworkElement();
637                     } else {
638                         LOG.info("Unhandled mountpoint {}", nodeName);
639                     }
640                 }
641                 LOG.info("End refresh mountpoint task {}", refreshCounter);
642             });
643             threadDoClearCurrentFaultByNodename.start();
644             return nodeNamesHandled;
645         }
646     };
647
648     /**
649      * Indication if init() of devicemanager successfully done.
650      *
651      * @return true if init() was sucessfull. False if not done or not successfull.
652      */
653     public boolean isDevicemanagerInitializationOk() {
654         return this.devicemanagerInitializationOk;
655     }
656
657     /**
658      * Get initialization status of database.
659      *
660      * @return true if fully initialized false if not
661      */
662     public boolean isDatabaseInitializationFinished() {
663         return htDatabase == null ? false : htDatabase.getInitialized();
664     }
665
666     /*---------------------------------------------------------------------
667      * Private funtions
668      */
669
670     /**
671      * Do the stream creation for the device.
672      *
673      * @param mountPointNodeName
674      * @param mountPoint
675      */
676     private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
677
678         final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
679                 mountPoint.getService(RpcConsumerRegistry.class);
680         if (optionalRpcConsumerService.isPresent()) {
681             final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
682             final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
683             if (rpcService == null) {
684                 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
685             } else {
686                 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
687                         new CreateSubscriptionInputBuilder();
688                 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
689                 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
690                         mountPointNodeName);
691                 try {
692                     CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
693                     if (createSubscriptionInput == null) {
694                         LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
695                     } else {
696                         rpcService.createSubscription(createSubscriptionInput);
697                     }
698                 } catch (NullPointerException e) {
699                     LOG.warn("createSubscription failed");
700                 }
701             }
702         } else {
703             LOG.warn("No RpcConsumerRegistry avaialble.");
704         }
705
706     }
707
708     /**
709      * Get NE object
710      *
711      * @param mountpoint mount point name
712      * @return null or NE specific data
713      */
714     public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
715
716         return networkElementRepresentations.get(mountpoint);
717
718     }
719
720     /* -- LOG related functions -- */
721
722
723     private boolean isInClusterMode() {
724         return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
725     }
726
727     private String getClusterNetconfNodeName() {
728         return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
729     }
730
731     private boolean isNetconfNodeMaster(NetconfNode nnode) {
732         if (isInClusterMode()) {
733             LOG.debug("check if me is responsible for node");
734             String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
735                     : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
736             String myNodeName = getClusterNetconfNodeName();
737             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
738             if (!masterNodeName.equals(myNodeName)) {
739                 LOG.debug("netconf change but me is not master for this node");
740                 return false;
741             }
742         }
743         return true;
744     }
745
746
747     private void sleepMs(int milliseconds) {
748         try {
749             Thread.sleep(milliseconds);
750         } catch (InterruptedException e) {
751             LOG.debug("Interrupted sleep");
752             // Restore interrupted state...
753             Thread.currentThread().interrupt();
754         }
755     }
756
757 }