1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl;
20 import com.google.common.base.Optional;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.aaiconnector.impl.AaiProviderClient;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.archiveservice.ArchiveCleanService;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database.HtDatabaseNode;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementFactory;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.toggleAlarmFilter.NotificationDelayService;
32 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.HtDevicemanagerConfiguration;
33 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
35 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.PmConfig;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.dcaeconnector.impl.DcaeProviderClient;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitor;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorEmptyImpl;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.devicemonitor.impl.DeviceMonitorImpl;
41 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.database.service.HtDatabaseEventsService;
42 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.NetconfChangeListener;
43 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.listener.ODLEventListener;
44 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.ProblemNotificationXml;
45 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClient;
46 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientDummyImpl;
47 import org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml.WebSocketServiceClientImpl2;
48 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexConfigService;
49 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexMwtnService;
50 import org.onap.ccsdk.features.sdnr.wt.devicemanager.index.impl.IndexUpdateService;
51 import org.onap.ccsdk.features.sdnr.wt.devicemanager.maintenance.impl.MaintenanceServiceImpl;
52 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.PerformanceManagerImpl;
53 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
54 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
55 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
56 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
57 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
58 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
59 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
61 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInputBuilder;
62 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
63 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
79 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
81 @SuppressWarnings("deprecation")
82 public class DeviceManagerImpl implements DeviceManagerService, AutoCloseable, ResyncNetworkElementsListener {
84 private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
85 private static final String APPLICATION_NAME = "DeviceManager";
86 private static final String MYDBKEYNAMEBASE = "SDN-Controller";
88 // http://sendateodl:8181/restconf/operational/network-topology:network-topology/topology/topology-netconf
89 private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
90 InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
91 new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
92 @SuppressWarnings("unused")
93 private static final String STARTUPLOG_FILENAME = "etc/devicemanager.startup.log";
94 // private static final String STARTUPLOG_FILENAME2 = "data/cache/devicemanager.startup.log";
96 private DataBroker dataBroker = null;
97 private MountPointService mountPointService = null;
98 private RpcProviderRegistry rpcProviderRegistry = null;
99 @SuppressWarnings("unused")
100 private NotificationPublishService notificationPublishService = null;
101 private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
103 private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> networkElementRepresentations =
104 new ConcurrentHashMap<>();
105 private final ONFCoreNetworkElementRepresentation networkelementLock = ONFCoreNetworkElementFactory.getEmpty("NE-LOCK");
106 private WebSocketServiceClient webSocketService;
107 private HtDatabaseEventsService databaseClientEvents;
108 private ODLEventListener odlEventListener;
109 private NetconfChangeListener netconfChangeListener;
110 private DeviceManagerApiServiceImpl rpcApiService;
111 private @Nullable PerformanceManagerImpl performanceManager = null;
112 private ProviderClient dcaeProviderClient;
113 private ProviderClient aotsMProvider;
114 private @Nullable AaiProviderClient aaiProviderClient = null;
115 private @Nullable DeviceMonitor deviceMonitor = new DeviceMonitorEmptyImpl();
116 private IndexUpdateService updateService;
117 private IndexConfigService configService;
118 private IndexMwtnService mwtnService;
119 private HtDatabaseNode htDatabase;
120 private Boolean devicemanagerInitializationOk = false;
121 private MaintenanceServiceImpl maintenanceService;
122 private NotificationDelayService<ProblemNotificationXml> notificationDelayService;
123 private Thread threadDoClearCurrentFaultByNodename = null;
124 private int refreshCounter = 0;
125 private AkkaConfig akkaConfig;
126 private ArchiveCleanService archiveCleanService;
127 @SuppressWarnings("unused")
128 private ClusterSingletonServiceRegistration cssRegistration;
131 public DeviceManagerImpl() {
132 LOG.info("Creating provider for {}", APPLICATION_NAME);
135 public void setDataBroker(DataBroker dataBroker) {
136 this.dataBroker = dataBroker;
139 public void setRpcProviderRegistry(RpcProviderRegistry rpcProviderRegistry) {
140 this.rpcProviderRegistry = rpcProviderRegistry;
143 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
144 this.notificationPublishService = notificationPublishService;
147 public void setMountPointService(MountPointService mountPointService) {
148 this.mountPointService = mountPointService;
150 public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
151 this.clusterSingletonServiceProvider = clusterSingletonService;
155 LOG.info("Session Initiated start {}", APPLICATION_NAME);
158 this.rpcApiService = new DeviceManagerApiServiceImpl(rpcProviderRegistry);
160 HtDevicemanagerConfiguration config = HtDevicemanagerConfiguration.getConfiguration();
162 this.akkaConfig = AkkaConfig.load();
163 LOG.debug("akka.conf loaded: " + akkaConfig.toString());
164 } catch (Exception e1) {
165 this.akkaConfig = null;
166 LOG.warn("problem loading akka.conf: " + e1.getMessage());
168 GeoConfig geoConfig = null;
169 if (akkaConfig != null && akkaConfig.isCluster()) {
170 LOG.info("cluster mode detected");
171 if (GeoConfig.fileExists()) {
173 LOG.debug("try to load geoconfig");
174 geoConfig = GeoConfig.load();
175 } catch (Exception err) {
176 LOG.warn("problem loading geoconfig: " + err.getMessage());
179 LOG.debug("no geoconfig file found");
182 LOG.info("single node mode detected");
185 this.notificationDelayService = new NotificationDelayService<>(config);
187 EsConfig dbConfig = config.getEs();
188 LOG.debug("esConfig=" + dbConfig.toString());
190 htDatabase = HtDatabaseNode.start(dbConfig, akkaConfig, geoConfig);
191 if (htDatabase == null) {
192 LOG.error("Can only run with local database. Stop initialization of devicemanager.");
194 // init Database Values only if singleNode or clusterMember=1
195 if (akkaConfig == null || akkaConfig.isClusterAndFirstNode()) {
196 // Create DB index if not existing and if database is running
198 this.configService = new IndexConfigService(htDatabase);
199 this.mwtnService = new IndexMwtnService(htDatabase);
200 } catch (Exception e) {
201 LOG.warn("Can not start ES access clients to provide database index config, mwtn. ", e);
204 // start service for device maintenance service
205 this.maintenanceService = new MaintenanceServiceImpl(htDatabase);
209 this.webSocketService = new WebSocketServiceClientImpl2(rpcProviderRegistry);
210 } catch (Exception e) {
211 LOG.error("Can not start websocket service. Loading mock class.", e);
212 this.webSocketService = new WebSocketServiceClientDummyImpl();
215 this.dcaeProviderClient = new DcaeProviderClient(config, dbConfig.getCluster(), this);
217 this.aaiProviderClient = new AaiProviderClient(config, this);
219 EsConfig emConfig = dbConfig.cloneWithIndex("sdnevents");
221 if (emConfig == null) {
222 LOG.warn("No configuration available. Don't start event manager");
224 this.databaseClientEvents = new HtDatabaseEventsService(htDatabase);
226 String myDbKeyNameExtended = MYDBKEYNAMEBASE + "-" + dbConfig.getCluster();
228 this.odlEventListener = new ODLEventListener(myDbKeyNameExtended, webSocketService,
229 databaseClientEvents, dcaeProviderClient, aotsMProvider, maintenanceService);
231 this.archiveCleanService = new ArchiveCleanService(config, databaseClientEvents, mwtnService);
232 this.cssRegistration = this.clusterSingletonServiceProvider.registerClusterSingletonService(this.archiveCleanService);
234 PmConfig configurationPM = config.getPm();
235 LOG.info("Performance manager configuration: {}", configurationPM);
236 if (!configurationPM.isPerformanceManagerEnabled()) {
238 LOG.info("No configuration available. Don't start performance manager");
241 MicrowaveHistoricalPerformanceWriterService databaseClientHistoricalPerformance;
242 databaseClientHistoricalPerformance = new MicrowaveHistoricalPerformanceWriterService(htDatabase);
243 this.performanceManager = new PerformanceManagerImpl(60, databaseClientHistoricalPerformance);
246 // DUS (Database update service)
247 LOG.debug("start db update service");
249 new IndexUpdateService(htDatabase, dbConfig.getHost(), dbConfig.getCluster(), dbConfig.getNode());
250 this.updateService.start();
252 // RPC Service for specific services
253 this.rpcApiService.setMaintenanceService(this.maintenanceService);
254 this.rpcApiService.setResyncListener(this);
256 // DeviceMonitor has to be available before netconfSubscriptionManager is
258 LOG.debug("start DeviceMonitor Service");
259 this.deviceMonitor = new DeviceMonitorImpl(dataBroker, odlEventListener, config);
261 // netconfSubscriptionManager should be the last one because this is a callback
263 LOG.debug("start NetconfSubscriptionManager Service");
264 // this.netconfSubscriptionManager = new
265 // NetconfSubscriptionManagerOfDeviceManager(this, dataBroker);
266 // this.netconfSubscriptionManager.register();
267 this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
268 this.netconfChangeListener.register();
270 this.devicemanagerInitializationOk = true;
272 LOG.info("Session Initiated end. Initialization done {}", devicemanagerInitializationOk);
276 public void close() throws Exception {
277 LOG.info("DeviceManagerImpl closing ...");
279 close(performanceManager);
280 close(dcaeProviderClient);
281 close(aaiProviderClient);
282 close(aotsMProvider);
283 close(deviceMonitor);
284 close(updateService, configService, mwtnService);
286 close(netconfChangeListener);
287 close(maintenanceService);
288 close(rpcApiService);
289 close(notificationDelayService);
290 close(archiveCleanService);
291 LOG.info("DeviceManagerImpl closing done");
296 * Used to close all Services, that should support AutoCloseable Pattern
301 private void close(AutoCloseable... toCloseList) throws Exception {
302 for (AutoCloseable element : toCloseList) {
303 if (element != null) {
309 /*-------------------------------------------------------------------------------------------
310 * Functions for interface DeviceManagerService
314 * For each mounted device a mountpoint is created and this listener is called.
315 * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
316 * @param action provide action
317 * @param nNodeId id of the mountpoint
318 * @param nNode mountpoint contents
320 public void startListenerOnNodeForConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
322 String mountPointNodeName = nNodeId.getValue();
323 LOG.info("Starting Event listener on Netconf for mountpoint {} Action {}", mountPointNodeName, action);
325 boolean preConditionMissing = false;
326 if (mountPointService == null) {
327 preConditionMissing = true;
328 LOG.warn("No mountservice available.");
330 if (!devicemanagerInitializationOk) {
331 preConditionMissing = true;
332 LOG.warn("Devicemanager initialization still pending.");
334 if (preConditionMissing) {
338 if (!isNetconfNodeMaster(nNode)) {
339 // Change Devicemonitor-status to connected ... for non master mountpoints.
340 deviceMonitor.deviceConnectSlaveIndication(mountPointNodeName);
343 InstanceIdentifier<Node> instanceIdentifier = NETCONF_TOPO_IID.child(Node.class,
344 new NodeKey(new NodeId(mountPointNodeName)));
346 Optional<MountPoint> optionalMountPoint = null;
348 while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent()
350 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
355 if (!optionalMountPoint.isPresent()) {
356 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
359 // Mountpoint is present for sure
360 MountPoint mountPoint = optionalMountPoint.get();
361 // BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
362 LOG.info("Mountpoint with id: {} class {} toString {}", mountPoint.getIdentifier(),
363 mountPoint.getClass().getName(), mountPoint);
364 Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
366 if (!optionalNetconfNodeDatabroker.isPresent()) {
367 LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
370 // It is master for mountpoint and all data are available.
371 // Make sure that specific mountPointNodeName is handled only once.
372 // be aware that startListenerOnNodeForConnectedState could be called multiple
373 // times for same mountPointNodeName.
374 // networkElementRepresentations contains handled NEs at master node.
376 synchronized (networkelementLock) {
377 if (networkElementRepresentations.containsKey(mountPointNodeName)) {
378 LOG.warn("Mountpoint {} already registered. Leave startup procedure.", mountPointNodeName);
381 ONFCoreNetworkElementRepresentation result = networkElementRepresentations.put(mountPointNodeName,
383 if (result != null) {
384 LOG.info("Expected null value was not provided, but {}", result.getMountPointNodeName());
389 DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
390 LOG.info("Master mountpoint {}", mountPointNodeName);
391 // getNodeInfoTest(dataBroker);
393 // create automatic empty maintenance entry into db before reading and listening
395 this.maintenanceService.createIfNotExists(mountPointNodeName);
397 // Setup microwaveEventListener for Notificationservice
399 // MicrowaveEventListener microwaveEventListener = new
400 // MicrowaveEventListener(mountPointNodeName, websocketmanagerService,
401 // xmlMapper, databaseClientEvents);
402 ONFCoreNetworkElementRepresentation ne = ONFCoreNetworkElementFactory.create(mountPointNodeName,
403 dataBroker, webSocketService, databaseClientEvents, instanceIdentifier,
404 netconfNodeDataBroker, dcaeProviderClient, aotsMProvider, maintenanceService,
405 notificationDelayService);
407 synchronized (networkelementLock) {
408 ONFCoreNetworkElementRepresentation result = networkElementRepresentations
409 .put(mountPointNodeName, ne);
410 if (result != networkelementLock) {
411 LOG.info("NE list does not provide lock as epxected, but {}.",
412 result.getMountPointNodeName());
415 ne.doRegisterMicrowaveEventListener(mountPoint);
417 // Register netconf stream
418 registerNotificationStream(mountPointNodeName, mountPoint, "NETCONF");
420 // -- Read data from NE
421 ne.initialReadFromNetworkElement();
422 ne.initSynchronizationExtension();
424 sendUpdateNotification(mountPointNodeName, nNode.getConnectionStatus());
426 if (aaiProviderClient != null) {
427 aaiProviderClient.onDeviceRegistered(mountPointNodeName);
429 // -- Register NE to performance manager
430 if (performanceManager != null) {
431 performanceManager.registration(mountPointNodeName, ne);
434 deviceMonitor.deviceConnectMasterIndication(mountPointNodeName, ne);
436 LOG.info("Starting Event listener on Netconf device :: Name : {} finished", mountPointNodeName);
443 * Mountpoint created or existing. Managed device is actually disconnected from node/ mountpoint.
444 * Origin state: Connecting, Connected
445 * Target state: are UnableToConnect or Connecting
446 * @param action create or update
447 * @param nNodeId id of the mountpoint
448 * @param nNode mountpoint contents
450 public void enterNonConnectedState(Action action, NodeId nNodeId, NetconfNode nNode) {
451 String mountPointNodeName = nNodeId.getValue();
452 ConnectionStatus csts = nNode.getConnectionStatus();
453 if (isNetconfNodeMaster(nNode)) {
454 sendUpdateNotification(mountPointNodeName, csts);
457 // Handling if mountpoint exist. connected -> connecting/UnableToConnect
458 stopListenerOnNodeForConnectedState(mountPointNodeName);
460 deviceMonitor.deviceDisconnectIndication(mountPointNodeName);
465 * Mountpoint removed indication.
466 * @param nNodeId id of the mountpoint
468 public void removeMountpointState(NodeId nNodeId) {
469 String mountPointNodeName = nNodeId.getValue();
470 LOG.info("mountpointNodeRemoved {}", nNodeId.getValue());
472 stopListenerOnNodeForConnectedState(mountPointNodeName);
473 deviceMonitor.removeMountpointIndication(mountPointNodeName);
474 if (odlEventListener != null) {
475 odlEventListener.deRegistration(mountPointNodeName);
480 * Do all tasks necessary to move from mountpoint state connected -> connecting
481 * @param mountPointNodeName provided
482 * @param ne representing the device connected to mountpoint
484 private void stopListenerOnNodeForConnectedState( String mountPointNodeName) {
485 ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.remove(mountPointNodeName);
487 this.maintenanceService.deleteIfNotRequired(mountPointNodeName);
488 int problems = ne.removeAllCurrentProblemsOfNode();
489 LOG.debug("Removed all {} problems from database at deregistration for {}", problems, mountPointNodeName);
490 if (performanceManager != null) {
491 performanceManager.deRegistration(mountPointNodeName);
493 if (aaiProviderClient != null) {
494 aaiProviderClient.onDeviceUnregistered(mountPointNodeName);
499 private void sendUpdateNotification(String mountPointNodeName, ConnectionStatus csts) {
500 LOG.info("enter Non ConnectedState for device :: Name : {} ConnectionStatus {}", mountPointNodeName, csts);
501 if (odlEventListener != null) {
502 odlEventListener.updateRegistration(mountPointNodeName, ConnectionStatus.class.getSimpleName(), csts != null ? csts.getName() : "null");
507 * Handle netconf/mountpoint changes
510 public void netconfChangeHandler(Action action, @Nullable ConnectionStatus csts, NodeId nodeId, NetconfNode nNode) {
512 ClusteredConnectionStatus ccsts = nNode.getClusteredConnectionStatus();
513 String nodeIdString = nodeId.getValue();
514 if (action == Action.CREATE) {
515 if (odlEventListener != null) {
516 odlEventListener.registration(nodeIdString);
519 boolean isCluster = akkaConfig == null && akkaConfig.isCluster();
520 if (isCluster && ccsts == null) {
521 LOG.debug("NETCONF Node {} {} does not provide cluster status. Stop execution.", nodeIdString, action);
525 removeMountpointState(nodeId); // Stop Monitor
533 startListenerOnNodeForConnectedState(action, nodeId, nNode);
536 case UnableToConnect:
538 enterNonConnectedState(action, nodeId, nNode);
543 LOG.debug("NETCONF Node handled with null status for action", action);
550 /*-------------------------------------------------------------------------------------------
554 public ArchiveCleanService getArchiveCleanService() {
555 return this.archiveCleanService;
558 public HtDatabaseEventsService getDatabaseClientEvents() {
559 return databaseClientEvents;
562 public IndexMwtnService getMwtnService() {
567 * Async RPC Interface implementation
570 public @Nonnull List<String> doClearCurrentFaultByNodename(@Nullable List<String> nodeNamesInput)
571 throws IllegalStateException {
573 if (this.databaseClientEvents == null) {
574 throw new IllegalStateException("dbEvents service not instantiated");
577 if (threadDoClearCurrentFaultByNodename != null && threadDoClearCurrentFaultByNodename.isAlive()) {
578 throw new IllegalStateException("A clear task is already active");
581 // Create list of mountpoints if input is empty, using the content in ES
582 if (nodeNamesInput == null || nodeNamesInput.size() <= 0) {
583 nodeNamesInput = this.databaseClientEvents.getAllNodesWithCurrentAlarms();
586 // Filter all mountpoints from input that were found and are known to this Cluster-node instance of
588 final List<String> nodeNamesHandled = new ArrayList<>();
589 for (String mountpointName : nodeNamesInput) {
590 LOG.info("Work with mountpoint {}", mountpointName);
592 if (odlEventListener != null && mountpointName.equals(odlEventListener.getOwnKeyName())) {
594 // SDN Controller related alarms
595 // -- can not be recreated on all nodes in connected state
596 // -- would result in a DCAE/AAI Notification
597 // Conclusion for 1810 Delivery ... not covered by RPC function (See issue #43)
598 LOG.info("Ignore SDN Controller related alarms for {}", mountpointName);
599 // this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
600 // nodeNamesHandled.add(mountpointName);
604 if (mountPointService != null) {
605 InstanceIdentifier<Node> instanceIdentifier =
606 NETCONF_TOPO_IID.child(Node.class, new NodeKey(new NodeId(mountpointName)));
607 Optional<MountPoint> optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier);
609 if (!optionalMountPoint.isPresent()) {
610 LOG.info("Remove Alarms for unknown mountpoint {}", mountpointName);
611 this.databaseClientEvents.clearFaultsCurrentOfNode(mountpointName);
612 nodeNamesHandled.add(mountpointName);
614 if (networkElementRepresentations.containsKey(mountpointName)) {
615 LOG.info("At node known mountpoint {}", mountpointName);
616 nodeNamesHandled.add(mountpointName);
618 LOG.info("At node unknown mountpoint {}", mountpointName);
626 this.deviceMonitor.refreshAlarmsInDb();
628 threadDoClearCurrentFaultByNodename = new Thread(() -> {
630 LOG.info("Start refresh mountpoint task {}", refreshCounter);
631 // for(String nodeName:nodeNamesOutput) {
632 for (String nodeName : nodeNamesHandled) {
633 ONFCoreNetworkElementRepresentation ne = networkElementRepresentations.get(nodeName);
635 LOG.info("Refresh mountpoint {}", nodeName);
636 ne.initialReadFromNetworkElement();
638 LOG.info("Unhandled mountpoint {}", nodeName);
641 LOG.info("End refresh mountpoint task {}", refreshCounter);
643 threadDoClearCurrentFaultByNodename.start();
644 return nodeNamesHandled;
649 * Indication if init() of devicemanager successfully done.
651 * @return true if init() was sucessfull. False if not done or not successfull.
653 public boolean isDevicemanagerInitializationOk() {
654 return this.devicemanagerInitializationOk;
658 * Get initialization status of database.
660 * @return true if fully initialized false if not
662 public boolean isDatabaseInitializationFinished() {
663 return htDatabase == null ? false : htDatabase.getInitialized();
666 /*---------------------------------------------------------------------
671 * Do the stream creation for the device.
673 * @param mountPointNodeName
676 private void registerNotificationStream(String mountPointNodeName, MountPoint mountPoint, String streamName) {
678 final Optional<RpcConsumerRegistry> optionalRpcConsumerService =
679 mountPoint.getService(RpcConsumerRegistry.class);
680 if (optionalRpcConsumerService.isPresent()) {
681 final RpcConsumerRegistry rpcConsumerRegitry = optionalRpcConsumerService.get();
682 final NotificationsService rpcService = rpcConsumerRegitry.getRpcService(NotificationsService.class);
683 if (rpcService == null) {
684 LOG.warn("rpcService is null for mountpoint {}", mountPointNodeName);
686 final CreateSubscriptionInputBuilder createSubscriptionInputBuilder =
687 new CreateSubscriptionInputBuilder();
688 createSubscriptionInputBuilder.setStream(new StreamNameType(streamName));
689 LOG.info("Event listener triggering notification stream {} for node {}", streamName,
692 CreateSubscriptionInput createSubscriptionInput = createSubscriptionInputBuilder.build();
693 if (createSubscriptionInput == null) {
694 LOG.warn("createSubscriptionInput is null for mountpoint {}", mountPointNodeName);
696 rpcService.createSubscription(createSubscriptionInput);
698 } catch (NullPointerException e) {
699 LOG.warn("createSubscription failed");
703 LOG.warn("No RpcConsumerRegistry avaialble.");
711 * @param mountpoint mount point name
712 * @return null or NE specific data
714 public @Nullable ONFCoreNetworkElementRepresentation getNeByMountpoint(String mountpoint) {
716 return networkElementRepresentations.get(mountpoint);
720 /* -- LOG related functions -- */
723 private boolean isInClusterMode() {
724 return this.akkaConfig == null ? false : this.akkaConfig.isCluster();
727 private String getClusterNetconfNodeName() {
728 return this.akkaConfig == null ? "" : this.akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
731 private boolean isNetconfNodeMaster(NetconfNode nnode) {
732 if (isInClusterMode()) {
733 LOG.debug("check if me is responsible for node");
734 String masterNodeName = nnode.getClusteredConnectionStatus() == null ? "null"
735 : nnode.getClusteredConnectionStatus().getNetconfMasterNode();
736 String myNodeName = getClusterNetconfNodeName();
737 LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + myNodeName);
738 if (!masterNodeName.equals(myNodeName)) {
739 LOG.debug("netconf change but me is not master for this node");
747 private void sleepMs(int milliseconds) {
749 Thread.sleep(milliseconds);
750 } catch (InterruptedException e) {
751 LOG.debug("Interrupted sleep");
752 // Restore interrupted state...
753 Thread.currentThread().interrupt();