d3752cdc49e78c539f73452b419715ee9f6356ef
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
19
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import javax.annotation.Nullable;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
32 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
33 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
34 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
36 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
37 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
38 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
39 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
40 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
41 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
46 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
47 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
48 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
49 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataObjectModification;
52 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
53 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
54 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
55 import org.opendaylight.mdsal.binding.api.DataTreeModification;
56 import org.opendaylight.mdsal.binding.api.MountPointService;
57 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
58 import org.opendaylight.mdsal.binding.api.RpcProviderService;
59 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
62 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
63 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
75 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
76 import org.opendaylight.yangtools.concepts.ListenerRegistration;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.model.parser.api.YangParserException;
79 import org.opendaylight.yangtools.yang.model.parser.api.YangParserFactory;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 public class NetconfNodeStateServiceImpl
84         implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
85
86     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
87     private static final String APPLICATION_NAME = "NetconfNodeStateService";
88     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
89
90     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
91             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
92                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
93
94     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
95             InstanceIdentifier.create(NetworkTopology.class)
96                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
97                     .child(Node.class);
98
99     private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
100             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
101
102     // Name of ODL controller NETCONF instance
103     private static final NodeId CONTROLLER = new NodeId("controller-config");
104
105     // -- OSGi services, provided
106     private DataBroker dataBroker;
107     private MountPointService mountPointService;
108     private DOMMountPointService domMountPointService;
109     private RpcProviderService rpcProviderRegistry;
110     private IEntityDataProvider iEntityDataProvider;
111     @SuppressWarnings("unused")
112     private NotificationPublishService notificationPublishService;
113     @SuppressWarnings("unused")
114     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
115     private YangParserFactory yangParserFactory;
116     private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
117
118     // -- Parameter
119     private ListenerRegistration<L1> listenerL1;
120     private ListenerRegistration<L2> listenerL2;
121     @SuppressWarnings("unused")
122     private ClusterSingletonServiceRegistration cssRegistration;
123
124     private NetconfnodeStateServiceRpcApiImpl rpcApiService;
125
126     /** Indication if init() function called and fully executed **/
127     private Boolean initializationSuccessful;
128
129     /** Manager accessor objects for connection **/
130     private NetconfAccessorManager accessorManager;
131
132     /** List of all registered listeners **/
133     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
134
135     /** List of all registered listeners **/
136     private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
137
138     /** List of all registered listeners **/
139     private final List<VesNotificationListener> vesNotificationListenerList;
140
141     /** Indicates if running in cluster configuration **/
142     private boolean isCluster;
143
144     /** Indicates the name of the cluster **/
145     private String clusterName;
146
147     /** nodeId to threadPool (size=1) for datatreechange handling) **/
148     private final Map<String, ExecutorService> handlingPool;
149
150     private boolean handleDataTreeAsync;
151
152     private ConfigurationFileRepresentation configFileRepresentation;
153     private NetconfStateConfig config;
154     private NetconfCommunicatorManager netconfCommunicatorManager;
155     private DomContext domContext;
156
157     /** Blueprint **/
158     public NetconfNodeStateServiceImpl() {
159         LOG.info("Creating provider for {}", APPLICATION_NAME);
160
161         this.dataBroker = null;
162         this.mountPointService = null;
163         this.domMountPointService = null;
164         this.rpcProviderRegistry = null;
165         this.notificationPublishService = null;
166         this.clusterSingletonServiceProvider = null;
167         this.yangParserFactory = null;
168         this.domContext = null;
169
170         this.listenerL1 = null;
171         this.listenerL2 = null;
172         this.initializationSuccessful = false;
173         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
174         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
175         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
176         this.accessorManager = null;
177         this.handlingPool = new HashMap<>();
178
179     }
180
181     public void setDataBroker(DataBroker dataBroker) {
182         this.dataBroker = dataBroker;
183     }
184
185     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
186         this.rpcProviderRegistry = rpcProviderRegistry;
187     }
188
189     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
190         this.notificationPublishService = notificationPublishService;
191     }
192
193     public void setMountPointService(MountPointService mountPointService) {
194         this.mountPointService = mountPointService;
195     }
196
197     public void setDomMountPointService(DOMMountPointService domMountPointService) {
198         this.domMountPointService = domMountPointService;
199     }
200
201     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
202         this.clusterSingletonServiceProvider = clusterSingletonService;
203     }
204
205     public void setEntityDataProvider(IEntityDataProvider iEntityDataProvider) {
206         this.iEntityDataProvider = iEntityDataProvider;
207     }
208
209     public void setYangParserFactory(YangParserFactory yangParserFactory) {
210         this.yangParserFactory = yangParserFactory;
211     }
212
213     public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
214         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
215     }
216
217     /** Blueprint initialization
218      * @throws YangParserException **/
219     public void init() {
220
221         LOG.info("Session Initiated start {}", APPLICATION_NAME);
222         this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
223         this.netconfCommunicatorManager = new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
224         this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
225         // Start RPC Service
226         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
227         // Get configuration
228         this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
229         this.config = new NetconfStateConfig(this.configFileRepresentation);
230         this.handleDataTreeAsync = this.config.handleAsync();
231         this.configFileRepresentation.registerConfigChangedListener(this);
232
233         // Akka setup
234         AkkaConfig akkaConfig = getAkkaConfig();
235         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
236         this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
237
238         // Provide status information
239         ClusterConfig cc = akkaConfig == null ? null : akkaConfig.getClusterConfig();
240         this.iEntityDataProvider.setStatus(StatusKey.CLUSTER_SIZE,
241                 cc == null ? "1" : String.format("%d", cc.getClusterSize()));
242
243         // RPC Service for specific services
244         this.rpcApiService.setStatusCallback(this);
245
246         LOG.debug("start NetconfSubscriptionManager Service");
247         //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
248         //this.netconfChangeListener.register();
249         //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
250
251         listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
252         listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
253
254         this.initializationSuccessful = true;
255
256         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
257
258     }
259
260     /** Blueprint destroy-method method */
261     public void destroy() {
262         close();
263     }
264
265     public DomContext getDomContext() {
266         return Objects.requireNonNull(domContext, "Initialization not completed for domContext" );
267     }
268
269     public DataBroker getDataBroker() {
270         return dataBroker;
271     }
272
273     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
274         return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService" );
275     }
276
277     @Override
278     public GetStatusOutputBuilder getStatus(GetStatusInput input) {
279         return new GetStatusOutputBuilder();
280     }
281
282     @Override
283     public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
284             final @NonNull L netconfNodeConnectListener) {
285         LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
286         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
287
288         return new ListenerRegistration<L>() {
289             @Override
290             public @NonNull L getInstance() {
291                 return netconfNodeConnectListener;
292             }
293
294             @Override
295             public void close() {
296                 LOG.info("Remove connect listener {}", netconfNodeConnectListener);
297                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
298             }
299         };
300     }
301
302     @Override
303     public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
304             @NonNull L netconfNodeStateListener) {
305         LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName());
306         netconfNodeStateListenerList.add(netconfNodeStateListener);
307
308         return new ListenerRegistration<L>() {
309             @Override
310             public @NonNull L getInstance() {
311                 return netconfNodeStateListener;
312             }
313
314             @Override
315             public void close() {
316                 LOG.info("Remove state listener {}", netconfNodeStateListener);
317                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
318             }
319         };
320     }
321
322     @Override
323     public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
324             @NonNull L vesNotificationListener) {
325         LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
326         vesNotificationListenerList.add(vesNotificationListener);
327
328         return new ListenerRegistration<L>() {
329             @Override
330             public @NonNull L getInstance() {
331                 return vesNotificationListener;
332             }
333
334             @Override
335             public void close() {
336                 LOG.info("Remove Ves notification listener {}", vesNotificationListener);
337                 vesNotificationListenerList.remove(vesNotificationListener);
338             }
339         };
340     }
341
342     @Override
343     public void close() {
344         LOG.info("Closing start ...");
345         try {
346             close(rpcApiService, listenerL1, listenerL2);
347         } catch (Exception e) {
348             LOG.debug("Closing", e);
349         }
350         LOG.info("Closing done");
351     }
352
353     /**
354      * Used to close all Services, that should support AutoCloseable Pattern
355      *
356      * @param toClose
357      * @throws Exception
358      */
359     private void close(AutoCloseable... toCloseList) throws Exception {
360         for (AutoCloseable element : toCloseList) {
361             if (element != null) {
362                 element.close();
363             }
364         }
365         this.configFileRepresentation.unregisterConfigChangedListener(this);
366     }
367
368     /**
369      * Indication if init() of this bundle successfully done.
370      *
371      * @return true if init() was successful. False if not done or not successful.
372      */
373     public boolean isInitializationSuccessful() {
374         return this.initializationSuccessful;
375     }
376
377     /*-------------------------------------------------------------------------------------------
378      * Functions for interface DeviceManagerService
379      */
380
381     /**
382      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
383      * Managed device is now fully connected to node/mountpoint.
384      *
385      * @param nNodeId id of the mountpoint
386      * @param netconfNode mountpoint contents
387      */
388     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
389
390         String mountPointNodeName = nNodeId.getValue();
391         LOG.info("Access connected state for mountpoint {}", mountPointNodeName);
392
393         boolean preConditionMissing = false;
394         if (mountPointService == null) {
395             preConditionMissing = true;
396             LOG.warn("No mountservice available.");
397         }
398         if (!initializationSuccessful) {
399             preConditionMissing = true;
400             LOG.warn("Devicemanager initialization still pending.");
401         }
402         if (preConditionMissing) {
403             return;
404         }
405
406         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
407         LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
408         if (isNetconfNodeMaster) {
409             NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
410                 /*
411                  * --> Call Listers for onConnect() Indication
412                    for (all)
413                  */
414                 netconfNodeConnectListenerList.forEach(item -> {
415                     try {
416                         item.onEnterConnected(acessor);
417                     } catch (Exception e) {
418                         LOG.info("Exception during onEnterConnected listener call", e);
419                     }
420                 });
421
422                 LOG.info("Connect indication forwarded for {}", mountPointNodeName);
423         }
424     }
425
426     /**
427      * Leave the connected status to a non connected or removed status for master mountpoint
428      *
429      * @param action that occurred
430      * @param nNodeId id of the mountpoint
431      * @param netconfNode mountpoint contents or not available on remove
432      */
433     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
434         String mountPointNodeName = nNodeId.getValue();
435         LOG.info("leaveConnectedState id {}", mountPointNodeName);
436
437         if (this.accessorManager.containes(nNodeId)) {
438             netconfNodeConnectListenerList.forEach(item -> {
439                 try {
440                     if (item != null) {
441                         item.onLeaveConnected(nNodeId, optionalNetconfNode);
442                     } else {
443                         LOG.warn("Unexpeced null item during onleave");
444                     }
445                 } catch (Exception e) {
446                     LOG.info("Exception during onLeaveConnected listener call", e);
447                 }
448             });
449             LOG.info("Remove Master mountpoint {}", mountPointNodeName);
450             this.accessorManager.removeAccessor(nNodeId);
451         } else {
452             LOG.info("Master mountpoint already removed {}", mountPointNodeName);
453         }
454     }
455
456     // ---- onDataTreeChangedHandler
457
458     private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
459             ModificationType modificationTyp) {
460         // Move status into boolean flags for
461         boolean connectedBefore, connectedAfter, created;
462         NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
463         connectedAfter = isConnected(nNodeAfter);
464         if (root.getDataBefore() != null) {
465             // It is an update or delete
466             NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
467             connectedBefore = isConnected(nodeBefore);
468             created = false;
469         } else {
470             // It is a create
471             connectedBefore = false;
472             created = true;
473         }
474         LOG.info("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
475                 modificationTyp, created, connectedBefore, connectedAfter, isCluster,
476                 getClusteredConnectionStatus(nNodeAfter));
477         switch (modificationTyp) {
478             case SUBTREE_MODIFIED: // Create or modify sub level node
479             case WRITE: // Create or modify top level node
480                 // Treat an overwrite as an update
481                 // leaveConnected state.before = connected; state.after != connected
482                 // enterConnected state.after == connected
483                 // => Here create or update by checking root.getDataBefore() != null
484                 boolean handled = false;
485                 if (created) {
486                     handled = true;
487                     netconfNodeStateListenerList.forEach(item -> {
488                         try {
489                             item.onCreated(nodeId, nNodeAfter);
490                         } catch (Exception e) {
491                             LOG.info("Exception during onCreated listener call", e);
492                         }
493                     });
494                 }
495                 if (!connectedBefore && connectedAfter) {
496                     handled = true;
497                     enterConnectedState(nodeId, nNodeAfter);
498                 }
499                 if (connectedBefore && !connectedAfter) {
500                     handled = true;
501                     leaveConnectedState(nodeId, Optional.of(nNodeAfter));
502                 }
503                 if (!handled) {
504                     //Change if not handled by the messages before
505                     netconfNodeStateListenerList.forEach(item -> {
506                         try {
507                             item.onStateChange(nodeId, nNodeAfter);
508                         } catch (Exception e) {
509                             LOG.info("Exception during onStateChange listener call", e);
510                         }
511                     });
512                 }
513                 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
514                 break;
515             case DELETE:
516                 // Node removed
517                 // leaveconnected state.before = connected;
518                 if (!connectedBefore) {
519                     leaveConnectedState(nodeId, Optional.empty());
520                 }
521                 netconfNodeStateListenerList.forEach(item -> {
522                     try {
523                         item.onRemoved(nodeId);
524                     } catch (Exception e) {
525                         LOG.info("Exception during onRemoved listener call", e);
526                     }
527                 });
528                 // doProcessing(Action.REMOVE, nodeId, root);
529                 break;
530         }
531     }
532
533     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
534         for (final DataTreeModification<Node> change : changes) {
535
536             final DataObjectModification<Node> root = change.getRootNode();
537             if (LOG.isTraceEnabled()) {
538                 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
539                         change.getRootPath(), root);
540             }
541
542             // Catch potential nullpointer exceptions ..
543             try {
544                 ModificationType modificationTyp = root.getModificationType();
545                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
546                 NodeId nodeId = node != null ? node.getNodeId() : null;
547                 if (nodeId == null) {
548                     LOG.warn("L1 without nodeid.");
549                 } else {
550                     if (nodeId.equals(CONTROLLER)) {
551                         // Do not forward any controller related events to devicemanager
552                         LOG.debug("Stop processing for [{}]", nodeId);
553                     } else {
554                         if (modificationTyp == null) {
555                             LOG.warn("L1 empty modification type");
556                         } else {
557                             if (this.handleDataTreeAsync) {
558                                 ExecutorService executor = this.handlingPool.getOrDefault(nodeId.getValue(), null);
559                                 if (executor == null) {
560                                     executor = Executors.newFixedThreadPool(5);
561                                     this.handlingPool.put(nodeId.getValue(), executor);
562                                 }
563                                 executor.execute(new Thread() {
564                                     @Override
565                                     public void run() {
566                                         handleDataTreeChange(root, nodeId, modificationTyp);
567                                     }
568                                 });
569
570                             } else {
571                                 handleDataTreeChange(root, nodeId, modificationTyp);
572                             }
573                         }
574                     }
575                 }
576             } catch (NullPointerException | IllegalStateException e) {
577                 LOG.info("Data not available at ", e);
578             }
579         } //for
580         LOG.info("datatreechanged handler completed");
581     }
582
583     // ---- subclasses for listeners
584
585     /**
586      * Clustered listener function to select the right node from DataObjectModification. Called at all nodes.
587      */
588     private class L1 implements ClusteredDataTreeChangeListener<Node> {
589         @Override
590         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
591             LOG.info("L1 TreeChange enter changes:{}", changes.size());
592             //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
593             onDataTreeChangedHandler(changes);
594             LOG.info("L1 TreeChange leave");
595         }
596     }
597
598     /**
599      * Data change, called at leader/master
600      */
601     private class L2 implements DataTreeChangeListener<Node> {
602
603         @Override
604         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
605             LOG.info("L2 TreeChange enter changes:{}", changes.size());
606             // Do nothing
607             LOG.info("L2 TreeChange leave");
608         }
609     }
610
611     /* --- private helpers --- */
612     private static @Nullable NetconfNode getNetconfNode(Node node) {
613         return node != null ? node.augmentation(NetconfNode.class) : null;
614     }
615
616     private static boolean isConnected(NetconfNode nNode) {
617         return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
618     }
619
620     private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
621         return node != null ? node.getClusteredConnectionStatus() : null;
622     }
623
624     /* -- LOG related functions -- */
625
626     /** Analyze configuration **/
627     private static @Nullable AkkaConfig getAkkaConfig() {
628         AkkaConfig akkaConfig;
629         try {
630             akkaConfig = AkkaConfig.load();
631             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
632         } catch (Exception e1) {
633             akkaConfig = null;
634             LOG.warn("problem loading akka.conf: " + e1.getMessage());
635         }
636         if (akkaConfig != null && akkaConfig.isCluster()) {
637             LOG.info("cluster mode detected");
638             if (GeoConfig.fileExists()) {
639                 try {
640                     LOG.debug("try to load geoconfig");
641                     GeoConfig.load();
642                 } catch (Exception err) {
643                     LOG.warn("problem loading geoconfig: " + err.getMessage());
644                 }
645             } else {
646                 LOG.debug("no geoconfig file found");
647             }
648         } else {
649             LOG.info("single node mode detected");
650         }
651         return akkaConfig;
652     }
653
654     private boolean isNetconfNodeMaster(NetconfNode nNode) {
655         if (this.isCluster) {
656             LOG.debug("check if me is responsible for node");
657             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
658             @NonNull
659             String masterNodeName =
660                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
661             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
662             if (!masterNodeName.equals(clusterName)) {
663                 LOG.debug("netconf change but me is not master for this node");
664                 return false;
665             }
666         }
667         return true;
668     }
669
670
671
672     @Override
673     public void onConfigChanged() {
674         this.handleDataTreeAsync = this.config.handleAsync();
675
676     }
677 }