migrate sdnr features to phosphorus
[ccsdk/features.git] / sdnr / wt / netconfnode-state-service / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / netconfnodestateservice / impl / NetconfNodeStateServiceImpl.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
19
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import javax.annotation.Nullable;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
32 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
33 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
34 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
36 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
37 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
38 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
39 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
40 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
41 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContext;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
46 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
47 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
48 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
49 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataObjectModification;
52 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
53 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
54 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
55 import org.opendaylight.mdsal.binding.api.DataTreeModification;
56 import org.opendaylight.mdsal.binding.api.MountPointService;
57 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
58 import org.opendaylight.mdsal.binding.api.RpcProviderService;
59 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
62 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
63 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
64 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
70 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
75 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
76 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
77 import org.opendaylight.yangtools.concepts.ListenerRegistration;
78 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
79 import org.opendaylight.yangtools.yang.parser.api.YangParserException;
80 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
83
84 public class NetconfNodeStateServiceImpl
85         implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
86
87     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
88     private static final String APPLICATION_NAME = "NetconfNodeStateService";
89     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
90
91     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
92             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
93                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
94
95     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
96             InstanceIdentifier.create(NetworkTopology.class)
97                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
98                     .child(Node.class);
99
100     private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
101             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
102
103     // Name of ODL controller NETCONF instance
104     private static final NodeId CONTROLLER = new NodeId("controller-config");
105
106     // -- OSGi services, provided
107     private DataBroker dataBroker;
108     private DOMDataBroker domDataBroker;
109     private MountPointService mountPointService;
110     private DOMMountPointService domMountPointService;
111     private RpcProviderService rpcProviderRegistry;
112     private IEntityDataProvider iEntityDataProvider;
113     @SuppressWarnings("unused")
114     private NotificationPublishService notificationPublishService;
115     @SuppressWarnings("unused")
116     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
117     private YangParserFactory yangParserFactory;
118     private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
119
120     // -- Parameter
121     private ListenerRegistration<L1> listenerL1;
122     private ListenerRegistration<L2> listenerL2;
123     @SuppressWarnings("unused")
124     private ClusterSingletonServiceRegistration cssRegistration;
125
126     private NetconfnodeStateServiceRpcApiImpl rpcApiService;
127
128     /** Indication if init() function called and fully executed **/
129     private Boolean initializationSuccessful;
130
131     /** Manager accessor objects for connection **/
132     private NetconfAccessorManager accessorManager;
133
134     /** List of all registered listeners **/
135     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
136
137     /** List of all registered listeners **/
138     private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
139
140     /** List of all registered listeners **/
141     private final List<VesNotificationListener> vesNotificationListenerList;
142
143     /** Indicates if running in cluster configuration **/
144     private boolean isCluster;
145
146     /** Indicates the name of the cluster **/
147     private String clusterName;
148
149     /** nodeId to threadPool (size=1) for datatreechange handling) **/
150     private final Map<String, ExecutorService> handlingPool;
151
152     private boolean handleDataTreeAsync;
153
154     private ConfigurationFileRepresentation configFileRepresentation;
155     private NetconfStateConfig config;
156     private NetconfCommunicatorManager netconfCommunicatorManager;
157     private DomContext domContext;
158
159     /** Blueprint **/
160     public NetconfNodeStateServiceImpl() {
161         LOG.info("Creating provider for {}", APPLICATION_NAME);
162
163         this.dataBroker = null;
164         this.domDataBroker = null;
165         this.mountPointService = null;
166         this.domMountPointService = null;
167         this.rpcProviderRegistry = null;
168         this.notificationPublishService = null;
169         this.clusterSingletonServiceProvider = null;
170         this.yangParserFactory = null;
171         this.domContext = null;
172
173         this.listenerL1 = null;
174         this.listenerL2 = null;
175         this.initializationSuccessful = false;
176         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
177         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
178         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
179         this.accessorManager = null;
180         this.handlingPool = new HashMap<>();
181
182     }
183
184     public void setDataBroker(DataBroker dataBroker) {
185         this.dataBroker = dataBroker;
186     }
187
188     public void setDomDataBroker(DOMDataBroker domDataBroker) {
189         this.domDataBroker = domDataBroker;
190     }
191
192     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
193         this.rpcProviderRegistry = rpcProviderRegistry;
194     }
195
196     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
197         this.notificationPublishService = notificationPublishService;
198     }
199
200     public void setMountPointService(MountPointService mountPointService) {
201         this.mountPointService = mountPointService;
202     }
203
204     public void setDomMountPointService(DOMMountPointService domMountPointService) {
205         this.domMountPointService = domMountPointService;
206     }
207
208     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
209         this.clusterSingletonServiceProvider = clusterSingletonService;
210     }
211
212     public void setEntityDataProvider(IEntityDataProvider iEntityDataProvider) {
213         this.iEntityDataProvider = iEntityDataProvider;
214     }
215
216     public void setYangParserFactory(YangParserFactory yangParserFactory) {
217         this.yangParserFactory = yangParserFactory;
218     }
219
220     public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
221         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
222     }
223
224     /** Blueprint initialization
225      * @throws YangParserException **/
226     public void init() {
227
228         LOG.info("Session Initiated start {}", APPLICATION_NAME);
229         this.domContext = new DomContext(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
230         this.netconfCommunicatorManager = new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
231         this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
232         // Start RPC Service
233         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
234         // Get configuration
235         this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
236         this.config = new NetconfStateConfig(this.configFileRepresentation);
237         this.handleDataTreeAsync = this.config.handleAsync();
238         this.configFileRepresentation.registerConfigChangedListener(this);
239
240         // Akka setup
241         AkkaConfig akkaConfig = getAkkaConfig();
242         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
243         this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
244
245         // Provide status information
246         ClusterConfig cc = akkaConfig == null ? null : akkaConfig.getClusterConfig();
247         this.iEntityDataProvider.setStatus(StatusKey.CLUSTER_SIZE,
248                 cc == null ? "1" : String.format("%d", cc.getClusterSize()));
249
250         // RPC Service for specific services
251         this.rpcApiService.setStatusCallback(this);
252
253         LOG.debug("start NetconfSubscriptionManager Service");
254         //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
255         //this.netconfChangeListener.register();
256         //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
257
258         listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
259         listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
260
261         this.initializationSuccessful = true;
262
263         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
264
265     }
266
267     /** Blueprint destroy-method method */
268     public void destroy() {
269         close();
270     }
271
272     public DomContext getDomContext() {
273         return Objects.requireNonNull(domContext, "Initialization not completed for domContext" );
274     }
275
276     public DataBroker getDataBroker() {
277         return dataBroker;
278     }
279
280     public DOMDataBroker getDOMDataBroker() {
281         return domDataBroker;
282     }
283
284     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
285         return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService" );
286     }
287
288     @Override
289     public GetStatusOutputBuilder getStatus(GetStatusInput input) {
290         return new GetStatusOutputBuilder();
291     }
292
293     @Override
294     public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
295             final @NonNull L netconfNodeConnectListener) {
296         LOG.info("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
297         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
298
299         return new ListenerRegistration<L>() {
300             @Override
301             public @NonNull L getInstance() {
302                 return netconfNodeConnectListener;
303             }
304
305             @Override
306             public void close() {
307                 LOG.info("Remove connect listener {}", netconfNodeConnectListener);
308                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
309             }
310         };
311     }
312
313     @Override
314     public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
315             @NonNull L netconfNodeStateListener) {
316         LOG.info("Register state listener {}", netconfNodeStateListener.getClass().getName());
317         netconfNodeStateListenerList.add(netconfNodeStateListener);
318
319         return new ListenerRegistration<L>() {
320             @Override
321             public @NonNull L getInstance() {
322                 return netconfNodeStateListener;
323             }
324
325             @Override
326             public void close() {
327                 LOG.info("Remove state listener {}", netconfNodeStateListener);
328                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
329             }
330         };
331     }
332
333     @Override
334     public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
335             @NonNull L vesNotificationListener) {
336         LOG.info("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
337         vesNotificationListenerList.add(vesNotificationListener);
338
339         return new ListenerRegistration<L>() {
340             @Override
341             public @NonNull L getInstance() {
342                 return vesNotificationListener;
343             }
344
345             @Override
346             public void close() {
347                 LOG.info("Remove Ves notification listener {}", vesNotificationListener);
348                 vesNotificationListenerList.remove(vesNotificationListener);
349             }
350         };
351     }
352
353     @Override
354     public void close() {
355         LOG.info("Closing start ...");
356         try {
357             close(rpcApiService, listenerL1, listenerL2);
358         } catch (Exception e) {
359             LOG.debug("Closing", e);
360         }
361         LOG.info("Closing done");
362     }
363
364     /**
365      * Used to close all Services, that should support AutoCloseable Pattern
366      *
367      * @param toClose
368      * @throws Exception
369      */
370     private void close(AutoCloseable... toCloseList) throws Exception {
371         for (AutoCloseable element : toCloseList) {
372             if (element != null) {
373                 element.close();
374             }
375         }
376         this.configFileRepresentation.unregisterConfigChangedListener(this);
377     }
378
379     /**
380      * Indication if init() of this bundle successfully done.
381      *
382      * @return true if init() was successful. False if not done or not successful.
383      */
384     public boolean isInitializationSuccessful() {
385         return this.initializationSuccessful;
386     }
387
388     /*-------------------------------------------------------------------------------------------
389      * Functions for interface DeviceManagerService
390      */
391
392     /**
393      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
394      * Managed device is now fully connected to node/mountpoint.
395      *
396      * @param nNodeId id of the mountpoint
397      * @param netconfNode mountpoint contents
398      */
399     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
400
401         String mountPointNodeName = nNodeId.getValue();
402         LOG.info("Access connected state for mountpoint {}", mountPointNodeName);
403
404         boolean preConditionMissing = false;
405         if (mountPointService == null) {
406             preConditionMissing = true;
407             LOG.warn("No mountservice available.");
408         }
409         if (!initializationSuccessful) {
410             preConditionMissing = true;
411             LOG.warn("Devicemanager initialization still pending.");
412         }
413         if (preConditionMissing) {
414             return;
415         }
416
417         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
418         LOG.info("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
419         if (isNetconfNodeMaster) {
420             NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
421                 /*
422                  * --> Call Listers for onConnect() Indication
423                    for (all)
424                  */
425                 netconfNodeConnectListenerList.forEach(item -> {
426                     try {
427                         item.onEnterConnected(acessor);
428                     } catch (Exception e) {
429                         LOG.info("Exception during onEnterConnected listener call", e);
430                     }
431                 });
432
433                 LOG.info("Connect indication forwarded for {}", mountPointNodeName);
434         }
435     }
436
437     /**
438      * Leave the connected status to a non connected or removed status for master mountpoint
439      *
440      * @param action that occurred
441      * @param nNodeId id of the mountpoint
442      * @param netconfNode mountpoint contents or not available on remove
443      */
444     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
445         String mountPointNodeName = nNodeId.getValue();
446         LOG.info("leaveConnectedState id {}", mountPointNodeName);
447
448         if (this.accessorManager.containes(nNodeId)) {
449             netconfNodeConnectListenerList.forEach(item -> {
450                 try {
451                     if (item != null) {
452                         item.onLeaveConnected(nNodeId, optionalNetconfNode);
453                     } else {
454                         LOG.warn("Unexpeced null item during onleave");
455                     }
456                 } catch (Exception e) {
457                     LOG.info("Exception during onLeaveConnected listener call", e);
458                 }
459             });
460             LOG.info("Remove Master mountpoint {}", mountPointNodeName);
461             this.accessorManager.removeAccessor(nNodeId);
462         } else {
463             LOG.info("Master mountpoint already removed {}", mountPointNodeName);
464         }
465     }
466
467     // ---- onDataTreeChangedHandler
468
469     private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
470             ModificationType modificationTyp) {
471         // Move status into boolean flags for
472         boolean connectedBefore, connectedAfter, created;
473         NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
474         connectedAfter = isConnected(nNodeAfter);
475         if (root.getDataBefore() != null) {
476             // It is an update or delete
477             NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
478             connectedBefore = isConnected(nodeBefore);
479             created = false;
480         } else {
481             // It is a create
482             connectedBefore = false;
483             created = true;
484         }
485         LOG.info("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
486                 modificationTyp, created, connectedBefore, connectedAfter, isCluster,
487                 getClusteredConnectionStatus(nNodeAfter));
488         switch (modificationTyp) {
489             case SUBTREE_MODIFIED: // Create or modify sub level node
490             case WRITE: // Create or modify top level node
491                 // Treat an overwrite as an update
492                 // leaveConnected state.before = connected; state.after != connected
493                 // enterConnected state.after == connected
494                 // => Here create or update by checking root.getDataBefore() != null
495                 boolean handled = false;
496                 if (created) {
497                     handled = true;
498                     netconfNodeStateListenerList.forEach(item -> {
499                         try {
500                             item.onCreated(nodeId, nNodeAfter);
501                         } catch (Exception e) {
502                             LOG.info("Exception during onCreated listener call", e);
503                         }
504                     });
505                 }
506                 if (!connectedBefore && connectedAfter) {
507                     handled = true;
508                     enterConnectedState(nodeId, nNodeAfter);
509                 }
510                 if (connectedBefore && !connectedAfter) {
511                     handled = true;
512                     leaveConnectedState(nodeId, Optional.of(nNodeAfter));
513                 }
514                 if (!handled) {
515                     //Change if not handled by the messages before
516                     netconfNodeStateListenerList.forEach(item -> {
517                         try {
518                             item.onStateChange(nodeId, nNodeAfter);
519                         } catch (Exception e) {
520                             LOG.info("Exception during onStateChange listener call", e);
521                         }
522                     });
523                 }
524                 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
525                 break;
526             case DELETE:
527                 // Node removed
528                 // leaveconnected state.before = connected;
529                 if (!connectedBefore) {
530                     leaveConnectedState(nodeId, Optional.empty());
531                 }
532                 netconfNodeStateListenerList.forEach(item -> {
533                     try {
534                         item.onRemoved(nodeId);
535                     } catch (Exception e) {
536                         LOG.info("Exception during onRemoved listener call", e);
537                     }
538                 });
539                 // doProcessing(Action.REMOVE, nodeId, root);
540                 break;
541         }
542     }
543
544     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
545         for (final DataTreeModification<Node> change : changes) {
546
547             final DataObjectModification<Node> root = change.getRootNode();
548             if (LOG.isTraceEnabled()) {
549                 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
550                         change.getRootPath(), root);
551             }
552
553             // Catch potential nullpointer exceptions ..
554             try {
555                 ModificationType modificationTyp = root.getModificationType();
556                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
557                 NodeId nodeId = node != null ? node.getNodeId() : null;
558                 if (nodeId == null) {
559                     LOG.warn("L1 without nodeid.");
560                 } else {
561                     if (nodeId.equals(CONTROLLER)) {
562                         // Do not forward any controller related events to devicemanager
563                         LOG.debug("Stop processing for [{}]", nodeId);
564                     } else {
565                         if (modificationTyp == null) {
566                             LOG.warn("L1 empty modification type");
567                         } else {
568                             if (this.handleDataTreeAsync) {
569                                 ExecutorService executor = this.handlingPool.getOrDefault(nodeId.getValue(), null);
570                                 if (executor == null) {
571                                     executor = Executors.newFixedThreadPool(5);
572                                     this.handlingPool.put(nodeId.getValue(), executor);
573                                 }
574                                 executor.execute(new Thread() {
575                                     @Override
576                                     public void run() {
577                                         handleDataTreeChange(root, nodeId, modificationTyp);
578                                     }
579                                 });
580
581                             } else {
582                                 handleDataTreeChange(root, nodeId, modificationTyp);
583                             }
584                         }
585                     }
586                 }
587             } catch (NullPointerException | IllegalStateException e) {
588                 LOG.info("Data not available at ", e);
589             }
590         } //for
591         LOG.info("datatreechanged handler completed");
592     }
593
594     // ---- subclasses for listeners
595
596     /**
597      * Clustered listener function to select the right node from DataObjectModification. Called at all nodes.
598      */
599     private class L1 implements ClusteredDataTreeChangeListener<Node> {
600         @Override
601         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
602             LOG.info("L1 TreeChange enter changes:{}", changes.size());
603             //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
604             onDataTreeChangedHandler(changes);
605             LOG.info("L1 TreeChange leave");
606         }
607     }
608
609     /**
610      * Data change, called at leader/master
611      */
612     private class L2 implements DataTreeChangeListener<Node> {
613
614         @Override
615         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
616             LOG.info("L2 TreeChange enter changes:{}", changes.size());
617             // Do nothing
618             LOG.info("L2 TreeChange leave");
619         }
620     }
621
622     /* --- private helpers --- */
623     private static @Nullable NetconfNode getNetconfNode(Node node) {
624         return node != null ? node.augmentation(NetconfNode.class) : null;
625     }
626
627     private static boolean isConnected(NetconfNode nNode) {
628         return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
629     }
630
631     private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
632         return node != null ? node.getClusteredConnectionStatus() : null;
633     }
634
635     /* -- LOG related functions -- */
636
637     /** Analyze configuration **/
638     private static @Nullable AkkaConfig getAkkaConfig() {
639         AkkaConfig akkaConfig;
640         try {
641             akkaConfig = AkkaConfig.load();
642             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
643         } catch (Exception e1) {
644             akkaConfig = null;
645             LOG.warn("problem loading akka.conf: " + e1.getMessage());
646         }
647         if (akkaConfig != null && akkaConfig.isCluster()) {
648             LOG.info("cluster mode detected");
649             if (GeoConfig.fileExists()) {
650                 try {
651                     LOG.debug("try to load geoconfig");
652                     GeoConfig.load();
653                 } catch (Exception err) {
654                     LOG.warn("problem loading geoconfig: " + err.getMessage());
655                 }
656             } else {
657                 LOG.debug("no geoconfig file found");
658             }
659         } else {
660             LOG.info("single node mode detected");
661         }
662         return akkaConfig;
663     }
664
665     private boolean isNetconfNodeMaster(NetconfNode nNode) {
666         if (this.isCluster) {
667             LOG.debug("check if me is responsible for node");
668             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
669             @NonNull
670             String masterNodeName =
671                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
672             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
673             if (!masterNodeName.equals(clusterName)) {
674                 LOG.debug("netconf change but me is not master for this node");
675                 return false;
676             }
677         }
678         return true;
679     }
680
681
682
683     @Override
684     public void onConfigChanged() {
685         this.handleDataTreeAsync = this.config.handleAsync();
686
687     }
688
689 }