c4ec06ff8f870c6cd8ae9368a0942650d9982372
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
19
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import javax.annotation.Nullable;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
28 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
29 import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory;
30 import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool;
31 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
32 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
33 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.DomContext;
34 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
36 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
37 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
38 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
39 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
40 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
41 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContextImpl;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
46 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
47 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
48 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
49 import org.opendaylight.mdsal.binding.api.DataBroker;
50 import org.opendaylight.mdsal.binding.api.DataObjectModification;
51 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
52 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
53 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
54 import org.opendaylight.mdsal.binding.api.DataTreeModification;
55 import org.opendaylight.mdsal.binding.api.MountPointService;
56 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
57 import org.opendaylight.mdsal.binding.api.RpcProviderService;
58 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
59 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
60 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
61 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
62 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
63 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.ConnectionOper.ConnectionStatus;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.connection.oper.ClusteredConnectionStatus;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.network.topology.topology.topology.types.TopologyNetconf;
68 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
73 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
74 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
75 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
76 import org.opendaylight.yangtools.concepts.ListenerRegistration;
77 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
78 import org.opendaylight.yangtools.yang.parser.api.YangParserException;
79 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83 public class NetconfNodeStateServiceImpl
84         implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
85
86     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
87     private static final String APPLICATION_NAME = "NetconfNodeStateService";
88     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
89
90     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
91             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
92                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
93
94     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
95             InstanceIdentifier.create(NetworkTopology.class)
96                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
97                     .child(Node.class);
98
99     private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
100             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
101
102     // Name of ODL controller NETCONF instance
103     private static final NodeId CONTROLLER = new NodeId("controller-config");
104     private static final int ASYNC_EXECUTION_POOLSIZE = 20;
105
106     // -- OSGi services, provided
107     private DataBroker dataBroker;
108     private DOMDataBroker domDataBroker;
109     private MountPointService mountPointService;
110     private DOMMountPointService domMountPointService;
111     private RpcProviderService rpcProviderRegistry;
112     private IEntityDataProvider iEntityDataProvider;
113     @SuppressWarnings("unused")
114     private NotificationPublishService notificationPublishService;
115     @SuppressWarnings("unused")
116     private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
117     private YangParserFactory yangParserFactory;
118     private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
119
120     // -- Parameter
121     private ListenerRegistration<L1> listenerL1;
122     private ListenerRegistration<L2> listenerL2;
123     @SuppressWarnings("unused")
124     private ClusterSingletonServiceRegistration cssRegistration;
125
126     private NetconfnodeStateServiceRpcApiImpl rpcApiService;
127
128     /** Indication if init() function called and fully executed **/
129     private Boolean initializationSuccessful;
130
131     /** Manager accessor objects for connection **/
132     private NetconfAccessorManager accessorManager;
133
134     /** List of all registered listeners **/
135     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
136
137     /** List of all registered listeners **/
138     private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
139
140     /** List of all registered listeners **/
141     private final List<VesNotificationListener> vesNotificationListenerList;
142
143     /** Indicates if running in cluster configuration **/
144     private boolean isCluster;
145
146     /** Indicates the name of the cluster **/
147     private String clusterName;
148
149     /** nodeId to threadPool (size=1) for datatreechange handling) **/
150     //    private final Map<String, ExecutorService> handlingPool;
151     private KeyBasedThreadpool<NodeId, NetconfChangeDataHolder> handlingPool;
152
153     private boolean handleDataTreeAsync;
154
155     private ConfigurationFileRepresentation configFileRepresentation;
156     private NetconfStateConfig config;
157     private NetconfCommunicatorManager netconfCommunicatorManager;
158     private DomContext domContext;
159
160     /** Blueprint **/
161     public NetconfNodeStateServiceImpl() {
162         LOG.info("Creating provider for {}", APPLICATION_NAME);
163
164         this.dataBroker = null;
165         this.domDataBroker = null;
166         this.mountPointService = null;
167         this.domMountPointService = null;
168         this.rpcProviderRegistry = null;
169         this.notificationPublishService = null;
170         this.clusterSingletonServiceProvider = null;
171         this.yangParserFactory = null;
172         this.domContext = null;
173
174         this.listenerL1 = null;
175         this.listenerL2 = null;
176         this.initializationSuccessful = false;
177         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
178         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
179         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
180         this.accessorManager = null;
181         this.handlingPool = null;
182     }
183     public void setDataBroker(DataBroker dataBroker) {
184         this.dataBroker = dataBroker;
185     }
186
187     public void setDomDataBroker(DOMDataBroker domDataBroker) {
188         this.domDataBroker = domDataBroker;
189     }
190
191     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
192         this.rpcProviderRegistry = rpcProviderRegistry;
193     }
194
195     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
196         this.notificationPublishService = notificationPublishService;
197     }
198
199     public void setMountPointService(MountPointService mountPointService) {
200         this.mountPointService = mountPointService;
201     }
202
203     public void setDomMountPointService(DOMMountPointService domMountPointService) {
204         this.domMountPointService = domMountPointService;
205     }
206
207     public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
208         this.clusterSingletonServiceProvider = clusterSingletonService;
209     }
210
211     public void setEntityDataProvider(IEntityDataProvider iEntityDataProvider) {
212         this.iEntityDataProvider = iEntityDataProvider;
213     }
214
215     public void setYangParserFactory(YangParserFactory yangParserFactory) {
216         this.yangParserFactory = yangParserFactory;
217     }
218
219     public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
220         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
221     }
222
223     /**
224      * Blueprint initialization
225      *
226      * @throws YangParserException
227      **/
228     public void init() {
229
230         LOG.info("Session Initiated start {}", APPLICATION_NAME);
231         this.domContext = new DomContextImpl(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
232         this.netconfCommunicatorManager =
233                 new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
234         this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
235         // Start RPC Service
236         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
237         // Get configuration
238         this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
239         this.config = new NetconfStateConfig(this.configFileRepresentation);
240         this.handleDataTreeAsync = this.config.handleAsync();
241         this.configFileRepresentation.registerConfigChangedListener(this);
242
243         // Akka setup
244         AkkaConfig akkaConfig = getAkkaConfig();
245         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
246         this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
247
248         // Provide status information
249         ClusterConfig cc = akkaConfig == null ? null : akkaConfig.getClusterConfig();
250         this.iEntityDataProvider.setStatus(StatusKey.CLUSTER_SIZE,
251                 cc == null ? "1" : String.format("%d", cc.getClusterSize()));
252
253         // RPC Service for specific services
254         this.rpcApiService.setStatusCallback(this);
255
256         LOG.debug("start NetconfSubscriptionManager Service");
257         //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
258         //this.netconfChangeListener.register();
259         //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
260
261         listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
262         listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
263         this.handlingPool = new KeyBasedThreadpool<NodeId, NetconfChangeDataHolder>(this.config.getAsyncHandlingPoolsize(), 1,
264                 new GenericRunnableFactory<>() {
265                     public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) {
266                         return new Runnable() {
267
268                             @Override
269                             public void run() {
270                                 NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key,
271                                         arg.modificationTyp);
272                             }
273                         };
274                     };
275                 });
276         this.initializationSuccessful = true;
277
278         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
279
280     }
281
282     /** Blueprint destroy-method method */
283     public void destroy() {
284         close();
285     }
286
287     public DomContext getDomContext() {
288         return Objects.requireNonNull(domContext, "Initialization not completed for domContext");
289     }
290
291     public DataBroker getDataBroker() {
292         return dataBroker;
293     }
294
295     public DOMDataBroker getDOMDataBroker() {
296         return domDataBroker;
297     }
298
299     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
300         return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService");
301     }
302
303     @Override
304     public GetStatusOutputBuilder getStatus(GetStatusInput input) {
305         return new GetStatusOutputBuilder();
306     }
307
308     @Override
309     public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
310             final @NonNull L netconfNodeConnectListener) {
311         LOG.debug("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
312         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
313
314         return new ListenerRegistration<L>() {
315             @Override
316             public @NonNull L getInstance() {
317                 return netconfNodeConnectListener;
318             }
319
320             @Override
321             public void close() {
322                 LOG.debug("Remove connect listener {}", netconfNodeConnectListener);
323                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
324             }
325         };
326     }
327
328     @Override
329     public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
330             @NonNull L netconfNodeStateListener) {
331         LOG.debug("Register state listener {}", netconfNodeStateListener.getClass().getName());
332         netconfNodeStateListenerList.add(netconfNodeStateListener);
333
334         return new ListenerRegistration<L>() {
335             @Override
336             public @NonNull L getInstance() {
337                 return netconfNodeStateListener;
338             }
339
340             @Override
341             public void close() {
342                 LOG.debug("Remove state listener {}", netconfNodeStateListener);
343                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
344             }
345         };
346     }
347
348     @Override
349     public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
350             @NonNull L vesNotificationListener) {
351         LOG.debug("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
352         vesNotificationListenerList.add(vesNotificationListener);
353
354         return new ListenerRegistration<L>() {
355             @Override
356             public @NonNull L getInstance() {
357                 return vesNotificationListener;
358             }
359
360             @Override
361             public void close() {
362                 LOG.debug("Remove Ves notification listener {}", vesNotificationListener);
363                 vesNotificationListenerList.remove(vesNotificationListener);
364             }
365         };
366     }
367
368     @Override
369     public void close() {
370         LOG.info("Closing start ...");
371         try {
372             close(rpcApiService, listenerL1, listenerL2);
373         } catch (Exception e) {
374             LOG.debug("Closing", e);
375         }
376         LOG.info("Closing done");
377     }
378
379     /**
380      * Used to close all Services, that should support AutoCloseable Pattern
381      *
382      * @param toClose
383      * @throws Exception
384      */
385     private void close(AutoCloseable... toCloseList) throws Exception {
386         for (AutoCloseable element : toCloseList) {
387             if (element != null) {
388                 element.close();
389             }
390         }
391         this.configFileRepresentation.unregisterConfigChangedListener(this);
392     }
393
394     /**
395      * Indication if init() of this bundle successfully done.
396      *
397      * @return true if init() was successful. False if not done or not successful.
398      */
399     public boolean isInitializationSuccessful() {
400         return this.initializationSuccessful;
401     }
402
403     /*-------------------------------------------------------------------------------------------
404      * Functions for interface DeviceManagerService
405      */
406
407     /**
408      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
409      * Managed device is now fully connected to node/mountpoint.
410      *
411      * @param nNodeId id of the mountpoint
412      * @param netconfNode mountpoint contents
413      */
414     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
415
416         String mountPointNodeName = nNodeId.getValue();
417         LOG.debug("Access connected state for mountpoint {}", mountPointNodeName);
418
419         boolean preConditionMissing = false;
420         if (mountPointService == null) {
421             preConditionMissing = true;
422             LOG.warn("No mountservice available.");
423         }
424         if (!initializationSuccessful) {
425             preConditionMissing = true;
426             LOG.warn("Devicemanager initialization still pending.");
427         }
428         if (preConditionMissing) {
429             return;
430         }
431
432         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
433         LOG.debug("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
434         if (isNetconfNodeMaster) {
435             NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
436             /*
437              * --> Call Listers for onConnect() Indication
438                for (all)
439              */
440             netconfNodeConnectListenerList.forEach(item -> {
441                 try {
442                     item.onEnterConnected(acessor);
443                 } catch (Exception e) {
444                     LOG.debug("Exception during onEnterConnected listener call", e);
445                 }
446             });
447
448             LOG.debug("Connect indication forwarded for {}", mountPointNodeName);
449         }
450     }
451
452     /**
453      * Leave the connected status to a non connected or removed status for master mountpoint
454      *
455      * @param action that occurred
456      * @param nNodeId id of the mountpoint
457      * @param netconfNode mountpoint contents or not available on remove
458      */
459     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
460         String mountPointNodeName = nNodeId.getValue();
461         LOG.debug("leaveConnectedState id {}", mountPointNodeName);
462
463         if (this.accessorManager.containes(nNodeId)) {
464             netconfNodeConnectListenerList.forEach(item -> {
465                 try {
466                     if (item != null) {
467                         item.onLeaveConnected(nNodeId, optionalNetconfNode);
468                     } else {
469                         LOG.warn("Unexpeced null item during onleave");
470                     }
471                 } catch (Exception e) {
472                     LOG.debug("Exception during onLeaveConnected listener call", e);
473                 }
474             });
475             LOG.debug("Remove Master mountpoint {}", mountPointNodeName);
476             this.accessorManager.removeAccessor(nNodeId);
477         } else {
478             LOG.debug("Master mountpoint already removed {}", mountPointNodeName);
479         }
480     }
481
482     // ---- onDataTreeChangedHandler
483
484     private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
485             ModificationType modificationTyp) {
486         // Move status into boolean flags for
487         boolean connectedBefore, connectedAfter, created;
488         NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
489         connectedAfter = isConnected(nNodeAfter);
490         if (root.getDataBefore() != null) {
491             // It is an update or delete
492             NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
493             connectedBefore = isConnected(nodeBefore);
494             created = false;
495         } else {
496             // It is a create
497             connectedBefore = false;
498             created = true;
499         }
500         LOG.debug("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
501                 modificationTyp, created, connectedBefore, connectedAfter, isCluster,
502                 getClusteredConnectionStatus(nNodeAfter));
503         switch (modificationTyp) {
504             case SUBTREE_MODIFIED: // Create or modify sub level node
505             case WRITE: // Create or modify top level node
506                 // Treat an overwrite as an update
507                 // leaveConnected state.before = connected; state.after != connected
508                 // enterConnected state.after == connected
509                 // => Here create or update by checking root.getDataBefore() != null
510                 boolean handled = false;
511                 if (created) {
512                     handled = true;
513                     netconfNodeStateListenerList.forEach(item -> {
514                         try {
515                             item.onCreated(nodeId, nNodeAfter);
516                         } catch (Exception e) {
517                             LOG.info("Exception during onCreated listener call", e);
518                         }
519                     });
520                 }
521                 if (!connectedBefore && connectedAfter) {
522                     handled = true;
523                     enterConnectedState(nodeId, nNodeAfter);
524                 }
525                 if (connectedBefore && !connectedAfter) {
526                     handled = true;
527                     leaveConnectedState(nodeId, Optional.of(nNodeAfter));
528                 }
529                 if (!handled) {
530                     //Change if not handled by the messages before
531                     netconfNodeStateListenerList.forEach(item -> {
532                         try {
533                             item.onStateChange(nodeId, nNodeAfter);
534                         } catch (Exception e) {
535                             LOG.info("Exception during onStateChange listener call", e);
536                         }
537                     });
538                 }
539                 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
540                 break;
541             case DELETE:
542                 // Node removed
543                 // leaveconnected state.before = connected;
544                 if (!connectedBefore) {
545                     leaveConnectedState(nodeId, Optional.empty());
546                 }
547                 netconfNodeStateListenerList.forEach(item -> {
548                     try {
549                         item.onRemoved(nodeId);
550                     } catch (Exception e) {
551                         LOG.info("Exception during onRemoved listener call", e);
552                     }
553                 });
554                 // doProcessing(Action.REMOVE, nodeId, root);
555                 break;
556         }
557     }
558
559     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
560         for (final DataTreeModification<Node> change : changes) {
561
562             final DataObjectModification<Node> root = change.getRootNode();
563             if (LOG.isTraceEnabled()) {
564                 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
565                         change.getRootPath(), root);
566             }
567
568             // Catch potential nullpointer exceptions ..
569             try {
570                 ModificationType modificationTyp = root.getModificationType();
571                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
572                 NodeId nodeId = node != null ? node.getNodeId() : null;
573                 if (nodeId == null) {
574                     LOG.warn("L1 without nodeid.");
575                 } else {
576                     if (nodeId.equals(CONTROLLER)) {
577                         // Do not forward any controller related events to devicemanager
578                         LOG.debug("Stop processing for [{}]", nodeId);
579                     } else {
580                         if (modificationTyp == null) {
581                             LOG.warn("L1 empty modification type");
582                         } else {
583                             LOG.trace("handle data tree change with async={}",this.handleDataTreeAsync);
584                             if (this.handleDataTreeAsync) {
585                                 this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp));
586
587                             } else {
588                                 handleDataTreeChange(root, nodeId, modificationTyp);
589                             }
590                         }
591                     }
592                 }
593             } catch (NullPointerException | IllegalStateException e) {
594                 LOG.debug("Data not available at ", e);
595             }
596         } //for
597         LOG.debug("datatreechanged handler completed");
598     }
599
600     // ---- subclasses for listeners
601
602     /**
603      * Clustered listener function to select the right node from DataObjectModification. Called at all nodes.
604      */
605     private class L1 implements ClusteredDataTreeChangeListener<Node> {
606         @Override
607         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
608             LOG.debug("L1 TreeChange enter changes:{}", changes.size());
609             //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
610             onDataTreeChangedHandler(changes);
611             LOG.debug("L1 TreeChange leave");
612         }
613     }
614
615     /**
616      * Data change, called at leader/master
617      */
618     private class L2 implements DataTreeChangeListener<Node> {
619
620         @Override
621         public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
622             LOG.debug("L2 TreeChange enter changes:{}", changes.size());
623             // Do nothing
624             LOG.debug("L2 TreeChange leave");
625         }
626     }
627
628     /* --- private helpers --- */
629     private static @Nullable NetconfNode getNetconfNode(Node node) {
630         return node != null ? node.augmentation(NetconfNode.class) : null;
631     }
632
633     private static boolean isConnected(NetconfNode nNode) {
634         return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
635     }
636
637     private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
638         return node != null ? node.getClusteredConnectionStatus() : null;
639     }
640
641     /* -- LOG related functions -- */
642
643     /** Analyze configuration **/
644     private static @Nullable AkkaConfig getAkkaConfig() {
645         AkkaConfig akkaConfig;
646         try {
647             akkaConfig = AkkaConfig.load();
648             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
649         } catch (Exception e1) {
650             akkaConfig = null;
651             LOG.warn("problem loading akka.conf: " + e1.getMessage());
652         }
653         if (akkaConfig != null && akkaConfig.isCluster()) {
654             LOG.info("cluster mode detected");
655             if (GeoConfig.fileExists()) {
656                 try {
657                     LOG.debug("try to load geoconfig");
658                     GeoConfig.load();
659                 } catch (Exception err) {
660                     LOG.warn("problem loading geoconfig: " + err.getMessage());
661                 }
662             } else {
663                 LOG.debug("no geoconfig file found");
664             }
665         } else {
666             LOG.info("single node mode detected");
667         }
668         return akkaConfig;
669     }
670
671     private boolean isNetconfNodeMaster(NetconfNode nNode) {
672         if (this.isCluster) {
673             LOG.debug("check if me is responsible for node");
674             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
675             @NonNull
676             String masterNodeName =
677                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
678             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
679             if (!masterNodeName.equals(clusterName)) {
680                 LOG.debug("netconf change but me is not master for this node");
681                 return false;
682             }
683         }
684         return true;
685     }
686
687
688
689     @Override
690     public void onConfigChanged() {
691         this.handleDataTreeAsync = this.config.handleAsync();
692         //setting poolsize is not possible atm
693         //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize());
694
695     }
696
697     public class NetconfChangeDataHolder {
698
699         protected final DataObjectModification<Node> root;
700         protected final ModificationType modificationTyp;
701
702         public NetconfChangeDataHolder(DataObjectModification<Node> root, ModificationType modificationTyp) {
703             this.root = root;
704             this.modificationTyp = modificationTyp;
705         }
706
707     }
708 }