988dbd257f8d1f277e536fbd3de194596c1afb93
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
19
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import javax.annotation.Nullable;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
28 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
29 import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory;
30 import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool;
31 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
32 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
33 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
34 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
36 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
37 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
38 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
39 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
40 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext;
41 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
46 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
47 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
48 import org.opendaylight.mdsal.binding.api.DataBroker;
49 import org.opendaylight.mdsal.binding.api.DataObjectModification;
50 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.api.DataTreeModification;
54 import org.opendaylight.mdsal.binding.api.MountPointService;
55 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
56 import org.opendaylight.mdsal.binding.api.RpcProviderService;
57 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
58 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
59 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
60 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
61 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
62 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
75 import org.opendaylight.yangtools.concepts.ListenerRegistration;
76 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
77 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 public class NetconfNodeStateServiceImpl
82         implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
83
84     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
85     private static final String APPLICATION_NAME = "NetconfNodeStateService";
86     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
87
88     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
89             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
90                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
91
92     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
93             InstanceIdentifier.create(NetworkTopology.class)
94                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
95                     .child(Node.class);
96
97     private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
98             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
99
100     // Name of ODL controller NETCONF instance
101     private static final NodeId CONTROLLER = new NodeId("controller-config");
102     private static final int ASYNC_EXECUTION_POOLSIZE = 20;
103
104     // -- OSGi services, provided
105     private DataBroker dataBroker;
106     private DOMDataBroker domDataBroker;
107     private MountPointService mountPointService;
108     private DOMMountPointService domMountPointService;
109     private RpcProviderService rpcProviderRegistry;
110     private IEntityDataProvider iEntityDataProvider;
111     @SuppressWarnings("unused")
112     private NotificationPublishService notificationPublishService;
113     @SuppressWarnings("unused")
114     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
115     private YangParserFactory yangParserFactory;
116     private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
117
118     // -- Parameter
119     private ListenerRegistration<L1> listenerL1;
120     private ListenerRegistration<L2> listenerL2;
121     @SuppressWarnings("unused")
122     private ClusterSingletonServiceRegistration cssRegistration;
123
124     private NetconfnodeStateServiceRpcApiImpl rpcApiService;
125
126     /** Indication if init() function called and fully executed **/
127     private Boolean initializationSuccessful;
128
129     /** Manager accessor objects for connection **/
130     private NetconfAccessorManager accessorManager;
131
132     /** List of all registered listeners **/
133     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
134
135     /** List of all registered listeners **/
136     private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
137
138     /** List of all registered listeners **/
139     private final List<VesNotificationListener> vesNotificationListenerList;
140
141     /** Indicates if running in cluster configuration **/
142     private boolean isCluster;
143
144     /** Indicates the name of the cluster **/
145     private String clusterName;
146
147     /** nodeId to threadPool (size=1) for datatreechange handling) **/
148     //    private final Map<String, ExecutorService> handlingPool;
149     private KeyBasedThreadpool<NodeId, NetconfChangeDataHolder> handlingPool;
150
151     private boolean handleDataTreeAsync;
152
153     private ConfigurationFileRepresentation configFileRepresentation;
154     private NetconfStateConfig config;
155     private NetconfCommunicatorManager netconfCommunicatorManager;
156     private DomContext domContext;
157
158     /** Blueprint **/
159     public NetconfNodeStateServiceImpl() {
160         LOG.info("Creating provider for {}", APPLICATION_NAME);
161
162         this.dataBroker = null;
163         this.domDataBroker = null;
164         this.mountPointService = null;
165         this.domMountPointService = null;
166         this.rpcProviderRegistry = null;
167         this.notificationPublishService = null;
168         this.clusterSingletonServiceProvider = null;
169         this.yangParserFactory = null;
170         this.domContext = null;
171
172         this.listenerL1 = null;
173         this.listenerL2 = null;
174         this.initializationSuccessful = false;
175         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
176         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
177         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
178         this.accessorManager = null;
179         this.handlingPool = null;
180     }
181     public void setDataBroker(DataBroker dataBroker) {
182         this.dataBroker = dataBroker;
183     }
184
185     public void setDomDataBroker(DOMDataBroker domDataBroker) {
186         this.domDataBroker = domDataBroker;
187     }
188
189     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
190         this.rpcProviderRegistry = rpcProviderRegistry;
191     }
192
193     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
194         this.notificationPublishService = notificationPublishService;
195     }
196
197     public void setMountPointService(MountPointService mountPointService) {
198         this.mountPointService = mountPointService;
199     }
200
201     public void setDomMountPointService(DOMMountPointService domMountPointService) {
202         this.domMountPointService = domMountPointService;
203     }
204
205     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
206         this.clusterSingletonServiceProvider = clusterSingletonService;
207     }
208
209     public void setEntityDataProvider(IEntityDataProvider iEntityDataProvider) {
210         this.iEntityDataProvider = iEntityDataProvider;
211     }
212
213     public void setYangParserFactory(YangParserFactory yangParserFactory) {
214         this.yangParserFactory = yangParserFactory;
215     }
216
217     public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
218         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
219     }
220
221     /**
222      * Blueprint initialization
223      *
224      * @throws YangParserException
225      **/
226     public void init() {
227
228         LOG.info("Session Initiated start {}", APPLICATION_NAME);
229         this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
230         this.netconfCommunicatorManager =
231                 new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
232         this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
233         // Start RPC Service
234         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
235         // Get configuration
236         this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
237         this.config = new NetconfStateConfig(this.configFileRepresentation);
238         this.handleDataTreeAsync = this.config.handleAsync();
239         this.configFileRepresentation.registerConfigChangedListener(this);
240
241         // Akka setup
242         AkkaConfig akkaConfig = getAkkaConfig();
243         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
244         this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
245
246         // Provide status information
247         ClusterConfig cc = akkaConfig == null ? null : akkaConfig.getClusterConfig();
248         this.iEntityDataProvider.setStatus(StatusKey.CLUSTER_SIZE,
249                 cc == null ? "1" : String.format("%d", cc.getClusterSize()));
250
251         // RPC Service for specific services
252         this.rpcApiService.setStatusCallback(this);
253
254         LOG.debug("start NetconfSubscriptionManager Service");
255         //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
256         //this.netconfChangeListener.register();
257         //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
258
259         listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
260         listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
261         this.handlingPool = new KeyBasedThreadpool<NodeId, NetconfChangeDataHolder>(this.config.getAsyncHandlingPoolsize(), 1,
262                 new GenericRunnableFactory<>() {
263                     public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) {
264                         return new Runnable() {
265
266                             @Override
267                             public void run() {
268                                 NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key,
269                                         arg.modificationTyp);
270                             }
271                         };
272                     };
273                 });
274         this.initializationSuccessful = true;
275
276         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
277
278     }
279
280     /** Blueprint destroy-method method */
281     public void destroy() {
282         close();
283     }
284
285     public DomContext getDomContext() {
286         return Objects.requireNonNull(domContext, "Initialization not completed for domContext");
287     }
288
289     public DataBroker getDataBroker() {
290         return dataBroker;
291     }
292
293     public DOMDataBroker getDOMDataBroker() {
294         return domDataBroker;
295     }
296
297     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
298         return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService");
299     }
300
301     @Override
302     public GetStatusOutputBuilder getStatus(GetStatusInput input) {
303         return new GetStatusOutputBuilder();
304     }
305
306     @Override
307     public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
308             final @NonNull L netconfNodeConnectListener) {
309         LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
310         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
311
312         return new ListenerRegistration<L>() {
313             @Override
314             public @NonNull L getInstance() {
315                 return netconfNodeConnectListener;
316             }
317
318             @Override
319             public void close() {
320                 LOG.info("Remove connect listener {}", netconfNodeConnectListener);
321                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
322             }
323         };
324     }
325
326     @Override
327     public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
328             @NonNull L netconfNodeStateListener) {
329         LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName());
330         netconfNodeStateListenerList.add(netconfNodeStateListener);
331
332         return new ListenerRegistration<L>() {
333             @Override
334             public @NonNull L getInstance() {
335                 return netconfNodeStateListener;
336             }
337
338             @Override
339             public void close() {
340                 LOG.info("Remove state listener {}", netconfNodeStateListener);
341                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
342             }
343         };
344     }
345
346     @Override
347     public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
348             @NonNull L vesNotificationListener) {
349         LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
350         vesNotificationListenerList.add(vesNotificationListener);
351
352         return new ListenerRegistration<L>() {
353             @Override
354             public @NonNull L getInstance() {
355                 return vesNotificationListener;
356             }
357
358             @Override
359             public void close() {
360                 LOG.info("Remove Ves notification listener {}", vesNotificationListener);
361                 vesNotificationListenerList.remove(vesNotificationListener);
362             }
363         };
364     }
365
366     @Override
367     public void close() {
368         LOG.info("Closing start ...");
369         try {
370             close(rpcApiService, listenerL1, listenerL2);
371         } catch (Exception e) {
372             LOG.debug("Closing", e);
373         }
374         LOG.info("Closing done");
375     }
376
377     /**
378      * Used to close all Services, that should support AutoCloseable Pattern
379      *
380      * @param toClose
381      * @throws Exception
382      */
383     private void close(AutoCloseable... toCloseList) throws Exception {
384         for (AutoCloseable element : toCloseList) {
385             if (element != null) {
386                 element.close();
387             }
388         }
389         this.configFileRepresentation.unregisterConfigChangedListener(this);
390     }
391
392     /**
393      * Indication if init() of this bundle successfully done.
394      *
395      * @return true if init() was successful. False if not done or not successful.
396      */
397     public boolean isInitializationSuccessful() {
398         return this.initializationSuccessful;
399     }
400
401     /*-------------------------------------------------------------------------------------------
402      * Functions for interface DeviceManagerService
403      */
404
405     /**
406      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
407      * Managed device is now fully connected to node/mountpoint.
408      *
409      * @param nNodeId id of the mountpoint
410      * @param netconfNode mountpoint contents
411      */
412     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
413
414         String mountPointNodeName = nNodeId.getValue();
415         LOG.info("Access connected state for mountpoint {}", mountPointNodeName);
416
417         boolean preConditionMissing = false;
418         if (mountPointService == null) {
419             preConditionMissing = true;
420             LOG.warn("No mountservice available.");
421         }
422         if (!initializationSuccessful) {
423             preConditionMissing = true;
424             LOG.warn("Devicemanager initialization still pending.");
425         }
426         if (preConditionMissing) {
427             return;
428         }
429
430         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
431         LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
432         if (isNetconfNodeMaster) {
433             NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
434             /*
435              * --> Call Listers for onConnect() Indication
436                for (all)
437              */
438             netconfNodeConnectListenerList.forEach(item -> {
439                 try {
440                     item.onEnterConnected(acessor);
441                 } catch (Exception e) {
442                     LOG.info("Exception during onEnterConnected listener call", e);
443                 }
444             });
445
446             LOG.info("Connect indication forwarded for {}", mountPointNodeName);
447         }
448     }
449
450     /**
451      * Leave the connected status to a non connected or removed status for master mountpoint
452      *
453      * @param action that occurred
454      * @param nNodeId id of the mountpoint
455      * @param netconfNode mountpoint contents or not available on remove
456      */
457     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
458         String mountPointNodeName = nNodeId.getValue();
459         LOG.info("leaveConnectedState id {}", mountPointNodeName);
460
461         if (this.accessorManager.containes(nNodeId)) {
462             netconfNodeConnectListenerList.forEach(item -> {
463                 try {
464                     if (item != null) {
465                         item.onLeaveConnected(nNodeId, optionalNetconfNode);
466                     } else {
467                         LOG.warn("Unexpeced null item during onleave");
468                     }
469                 } catch (Exception e) {
470                     LOG.info("Exception during onLeaveConnected listener call", e);
471                 }
472             });
473             LOG.info("Remove Master mountpoint {}", mountPointNodeName);
474             this.accessorManager.removeAccessor(nNodeId);
475         } else {
476             LOG.info("Master mountpoint already removed {}", mountPointNodeName);
477         }
478     }
479
480     // ---- onDataTreeChangedHandler
481
482     private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
483             ModificationType modificationTyp) {
484         // Move status into boolean flags for
485         boolean connectedBefore, connectedAfter, created;
486         NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
487         connectedAfter = isConnected(nNodeAfter);
488         if (root.getDataBefore() != null) {
489             // It is an update or delete
490             NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
491             connectedBefore = isConnected(nodeBefore);
492             created = false;
493         } else {
494             // It is a create
495             connectedBefore = false;
496             created = true;
497         }
498         LOG.info("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
499                 modificationTyp, created, connectedBefore, connectedAfter, isCluster,
500                 getClusteredConnectionStatus(nNodeAfter));
501         switch (modificationTyp) {
502             case SUBTREE_MODIFIED: // Create or modify sub level node
503             case WRITE: // Create or modify top level node
504                 // Treat an overwrite as an update
505                 // leaveConnected state.before = connected; state.after != connected
506                 // enterConnected state.after == connected
507                 // => Here create or update by checking root.getDataBefore() != null
508                 boolean handled = false;
509                 if (created) {
510                     handled = true;
511                     netconfNodeStateListenerList.forEach(item -> {
512                         try {
513                             item.onCreated(nodeId, nNodeAfter);
514                         } catch (Exception e) {
515                             LOG.info("Exception during onCreated listener call", e);
516                         }
517                     });
518                 }
519                 if (!connectedBefore && connectedAfter) {
520                     handled = true;
521                     enterConnectedState(nodeId, nNodeAfter);
522                 }
523                 if (connectedBefore && !connectedAfter) {
524                     handled = true;
525                     leaveConnectedState(nodeId, Optional.of(nNodeAfter));
526                 }
527                 if (!handled) {
528                     //Change if not handled by the messages before
529                     netconfNodeStateListenerList.forEach(item -> {
530                         try {
531                             item.onStateChange(nodeId, nNodeAfter);
532                         } catch (Exception e) {
533                             LOG.info("Exception during onStateChange listener call", e);
534                         }
535                     });
536                 }
537                 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
538                 break;
539             case DELETE:
540                 // Node removed
541                 // leaveconnected state.before = connected;
542                 if (!connectedBefore) {
543                     leaveConnectedState(nodeId, Optional.empty());
544                 }
545                 netconfNodeStateListenerList.forEach(item -> {
546                     try {
547                         item.onRemoved(nodeId);
548                     } catch (Exception e) {
549                         LOG.info("Exception during onRemoved listener call", e);
550                     }
551                 });
552                 // doProcessing(Action.REMOVE, nodeId, root);
553                 break;
554         }
555     }
556
557     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
558         for (final DataTreeModification<Node> change : changes) {
559
560             final DataObjectModification<Node> root = change.getRootNode();
561             if (LOG.isTraceEnabled()) {
562                 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
563                         change.getRootPath(), root);
564             }
565
566             // Catch potential nullpointer exceptions ..
567             try {
568                 ModificationType modificationTyp = root.getModificationType();
569                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
570                 NodeId nodeId = node != null ? node.getNodeId() : null;
571                 if (nodeId == null) {
572                     LOG.warn("L1 without nodeid.");
573                 } else {
574                     if (nodeId.equals(CONTROLLER)) {
575                         // Do not forward any controller related events to devicemanager
576                         LOG.debug("Stop processing for [{}]", nodeId);
577                     } else {
578                         if (modificationTyp == null) {
579                             LOG.warn("L1 empty modification type");
580                         } else {
581                             LOG.trace("handle data tree change with async={}",this.handleDataTreeAsync);
582                             if (this.handleDataTreeAsync) {
583                                 this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp));
584
585                             } else {
586                                 handleDataTreeChange(root, nodeId, modificationTyp);
587                             }
588                         }
589                     }
590                 }
591             } catch (NullPointerException | IllegalStateException e) {
592                 LOG.info("Data not available at ", e);
593             }
594         } //for
595         LOG.info("datatreechanged handler completed");
596     }
597
598     // ---- subclasses for listeners
599
600     /**
601      * Clustered listener function to select the right node from DataObjectModification. Called at all nodes.
602      */
603     private class L1 implements ClusteredDataTreeChangeListener<Node> {
604         @Override
605         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
606             LOG.info("L1 TreeChange enter changes:{}", changes.size());
607             //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
608             onDataTreeChangedHandler(changes);
609             LOG.info("L1 TreeChange leave");
610         }
611     }
612
613     /**
614      * Data change, called at leader/master
615      */
616     private class L2 implements DataTreeChangeListener<Node> {
617
618         @Override
619         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
620             LOG.info("L2 TreeChange enter changes:{}", changes.size());
621             // Do nothing
622             LOG.info("L2 TreeChange leave");
623         }
624     }
625
626     /* --- private helpers --- */
627     private static @Nullable NetconfNode getNetconfNode(Node node) {
628         return node != null ? node.augmentation(NetconfNode.class) : null;
629     }
630
631     private static boolean isConnected(NetconfNode nNode) {
632         return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
633     }
634
635     private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
636         return node != null ? node.getClusteredConnectionStatus() : null;
637     }
638
639     /* -- LOG related functions -- */
640
641     /** Analyze configuration **/
642     private static @Nullable AkkaConfig getAkkaConfig() {
643         AkkaConfig akkaConfig;
644         try {
645             akkaConfig = AkkaConfig.load();
646             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
647         } catch (Exception e1) {
648             akkaConfig = null;
649             LOG.warn("problem loading akka.conf: " + e1.getMessage());
650         }
651         if (akkaConfig != null && akkaConfig.isCluster()) {
652             LOG.info("cluster mode detected");
653             if (GeoConfig.fileExists()) {
654                 try {
655                     LOG.debug("try to load geoconfig");
656                     GeoConfig.load();
657                 } catch (Exception err) {
658                     LOG.warn("problem loading geoconfig: " + err.getMessage());
659                 }
660             } else {
661                 LOG.debug("no geoconfig file found");
662             }
663         } else {
664             LOG.info("single node mode detected");
665         }
666         return akkaConfig;
667     }
668
669     private boolean isNetconfNodeMaster(NetconfNode nNode) {
670         if (this.isCluster) {
671             LOG.debug("check if me is responsible for node");
672             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
673             @NonNull
674             String masterNodeName =
675                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
676             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
677             if (!masterNodeName.equals(clusterName)) {
678                 LOG.debug("netconf change but me is not master for this node");
679                 return false;
680             }
681         }
682         return true;
683     }
684
685
686
687     @Override
688     public void onConfigChanged() {
689         this.handleDataTreeAsync = this.config.handleAsync();
690         //setting poolsize is not possible atm
691         //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize());
692
693     }
694
695     public class NetconfChangeDataHolder {
696
697         protected final DataObjectModification<Node> root;
698         protected final ModificationType modificationTyp;
699
700         public NetconfChangeDataHolder(DataObjectModification<Node> root, ModificationType modificationTyp) {
701             this.root = root;
702             this.modificationTyp = modificationTyp;
703         }
704
705     }
706 }