1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
25 import javax.annotation.Nullable;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
29 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
30 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
31 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
32 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
34 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
36 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
37 import org.opendaylight.mdsal.binding.api.DataBroker;
38 import org.opendaylight.mdsal.binding.api.DataObjectModification;
39 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
40 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
41 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
42 import org.opendaylight.mdsal.binding.api.DataTreeModification;
43 import org.opendaylight.mdsal.binding.api.MountPoint;
44 import org.opendaylight.mdsal.binding.api.MountPointService;
45 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
46 import org.opendaylight.mdsal.binding.api.RpcProviderService;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
49 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatus;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 public class NetconfNodeStateServiceImpl implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable {
70 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
71 private static final String APPLICATION_NAME = "NetconfNodeStateService";
72 @SuppressWarnings("unused")
73 private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
76 private static final InstanceIdentifier<Topology> NETCONF_TOPO_IID =
77 InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
78 new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
80 private static final InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
81 InstanceIdentifier.create(NetworkTopology.class)
82 .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
85 private static final DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
86 DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
88 // Name of ODL controller NETCONF instance
89 private static final NodeId CONTROLLER = new NodeId("controller-config");
91 // -- OSGi services, provided
92 private DataBroker dataBroker;
93 private MountPointService mountPointService;
94 private RpcProviderService rpcProviderRegistry;
95 @SuppressWarnings("unused")
96 private NotificationPublishService notificationPublishService;
97 @SuppressWarnings("unused")
98 private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
101 private ListenerRegistration<L1> listenerL1;
102 private ListenerRegistration<L2> listenerL2;
103 @SuppressWarnings("unused")
104 private ClusterSingletonServiceRegistration cssRegistration;
106 private NetconfnodeStateServiceRpcApiImpl rpcApiService;
108 /** Indication if init() function called and fully executed **/
109 private Boolean initializationSuccessful;
111 /** List of all registered listeners **/
112 private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
114 /** List of all registered listeners **/
115 private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
117 /** List of all registered listeners **/
118 private final List<VesNotificationListener> vesNotificationListenerList;
120 /** Indicates if running in cluster configuration **/
121 private boolean isCluster;
123 /** Indicates the name of the cluster **/
124 private String clusterName;
127 public NetconfNodeStateServiceImpl() {
128 LOG.info("Creating provider for {}", APPLICATION_NAME);
130 this.dataBroker = null;
131 this.mountPointService = null;
132 this.rpcProviderRegistry = null;
133 this.notificationPublishService = null;
134 this.clusterSingletonServiceProvider = null;
136 this.listenerL1 = null;
137 this.listenerL2 = null;
138 this.initializationSuccessful= false;
139 this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
140 this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
141 this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
144 public void setDataBroker(DataBroker dataBroker) {
145 this.dataBroker = dataBroker;
148 public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
149 this.rpcProviderRegistry = rpcProviderRegistry;
152 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
153 this.notificationPublishService = notificationPublishService;
156 public void setMountPointService(MountPointService mountPointService) {
157 this.mountPointService = mountPointService;
159 public void setClusterSingletonService(ClusterSingletonServiceProvider clusterSingletonService) {
160 this.clusterSingletonServiceProvider = clusterSingletonService;
163 /** Blueprint initialization **/
166 LOG.info("Session Initiated start {}", APPLICATION_NAME);
169 this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
172 // ConfigurationFileRepresentation config = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
174 AkkaConfig akkaConfig = getAkkaConfig();
175 this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
176 this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
178 // RPC Service for specific services
179 this.rpcApiService.setStatusCallback(this);
181 LOG.debug("start NetconfSubscriptionManager Service");
182 //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
183 //this.netconfChangeListener.register();
184 //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
186 listenerL1 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
187 listenerL2 = dataBroker.registerDataTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L2());
189 this.initializationSuccessful = true;
191 LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
194 /** Blueprint destroy-method method */
195 public void destroy() {
201 * @return NetconfnodeStateServiceRpcApiImpl
203 public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
204 return rpcApiService;
208 public GetStatusOutputBuilder getStatus(GetStatusInput input) {
209 return new GetStatusOutputBuilder();
213 public <L extends NetconfNodeConnectListener> @NonNull ListenerRegistration<L> registerNetconfNodeConnectListener(
214 final @NonNull L netconfNodeConnectListener) {
215 LOG.info("Register connect listener {}",netconfNodeConnectListener.getClass().getName());
216 netconfNodeConnectListenerList.add(netconfNodeConnectListener);
218 return new ListenerRegistration<L>() {
220 public @NonNull L getInstance() {
221 return netconfNodeConnectListener;
225 public void close() {
226 LOG.info("Remove connect listener {}",netconfNodeConnectListener);
227 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
233 public <L extends NetconfNodeStateListener> @NonNull ListenerRegistration<L> registerNetconfNodeStateListener(
234 @NonNull L netconfNodeStateListener) {
235 LOG.info("Register state listener {}",netconfNodeStateListener.getClass().getName());
236 netconfNodeStateListenerList.add(netconfNodeStateListener);
238 return new ListenerRegistration<L>() {
240 public @NonNull L getInstance() {
241 return netconfNodeStateListener;
245 public void close() {
246 LOG.info("Remove state listener {}",netconfNodeStateListener);
247 netconfNodeStateListenerList.remove(netconfNodeStateListener);
253 public <L extends VesNotificationListener> @NonNull ListenerRegistration<L> registerVesNotifications(
254 @NonNull L vesNotificationListener) {
255 LOG.info("Register Ves notification listener {}",vesNotificationListener.getClass().getName());
256 vesNotificationListenerList.add(vesNotificationListener);
258 return new ListenerRegistration<L>() {
260 public @NonNull L getInstance() {
261 return vesNotificationListener;
265 public void close() {
266 LOG.info("Remove Ves notification listener {}",vesNotificationListener);
267 vesNotificationListenerList.remove(vesNotificationListener);
273 public void close() {
274 LOG.info("Closing start ...");
276 close(rpcApiService, listenerL1, listenerL2);
277 } catch (Exception e) {
278 LOG.debug("Closing", e);
280 LOG.info("Closing done");
284 * Used to close all Services, that should support AutoCloseable Pattern
289 private void close(AutoCloseable... toCloseList) throws Exception {
290 for (AutoCloseable element : toCloseList) {
291 if (element != null) {
298 * Indication if init() of this bundle successfully done.
299 * @return true if init() was successful. False if not done or not successful.
301 public boolean isInitializationSuccessful() {
302 return this.initializationSuccessful;
305 /*-------------------------------------------------------------------------------------------
306 * Functions for interface DeviceManagerService
310 * For each mounted device a mountpoint is created and this listener is called.
311 * Mountpoint was created or existing. Managed device is now fully connected to node/mountpoint.
312 * @param nNodeId id of the mountpoint
313 * @param netconfNode mountpoint contents
315 private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
317 String mountPointNodeName = nNodeId.getValue();
318 LOG.info("Starting Event listener on Netconf for mountpoint {}", mountPointNodeName);
320 boolean preConditionMissing = false;
321 if (mountPointService == null) {
322 preConditionMissing = true;
323 LOG.warn("No mountservice available.");
325 if (!initializationSuccessful) {
326 preConditionMissing = true;
327 LOG.warn("Devicemanager initialization still pending.");
329 if (preConditionMissing) {
333 if (isNetconfNodeMaster(netconfNode)) {
335 InstanceIdentifier<Node> instanceIdentifier = NETCONF_TOPO_IID.child(Node.class,
336 new NodeKey(new NodeId(mountPointNodeName)));
338 Optional<MountPoint> optionalMountPoint = null;
340 while (!(optionalMountPoint = mountPointService.getMountPoint(instanceIdentifier)).isPresent()
342 LOG.info("Event listener waiting for mount point for Netconf device :: Name : {}", mountPointNodeName);
347 if (!optionalMountPoint.isPresent()) {
348 LOG.warn("Event listener timeout while waiting for mount point for Netconf device :: Name : {} ",
351 // Mountpoint is present for sure
352 MountPoint mountPoint = optionalMountPoint.get();
353 // BindingDOMDataBrokerAdapter.BUILDER_FACTORY;
354 LOG.info("Mountpoint with id: {}", mountPoint.getIdentifier());
356 Optional<DataBroker> optionalNetconfNodeDatabroker = mountPoint.getService(DataBroker.class);
358 if (!optionalNetconfNodeDatabroker.isPresent()) {
359 LOG.info("Slave mountpoint {} without databroker", mountPointNodeName);
361 LOG.info("Master mountpoint {}", mountPointNodeName);
362 DataBroker netconfNodeDataBroker = optionalNetconfNodeDatabroker.get();
365 * --> Call Listers for onConnect() Indication
368 netconfNodeConnectListenerList.forEach(item -> {
370 item.onEnterConnected(nNodeId, netconfNode, netconfNodeDataBroker);
371 } catch (Exception e) {
372 LOG.info("Exception during onEnterConnected listener call", e);
376 LOG.info("Connect indication forwarded for {}", mountPointNodeName);
383 * Leave the connected status to a non connected or removed status
384 * @param action that occurred
385 * @param nNodeId id of the mountpoint
386 * @param nNode mountpoint contents
388 private void leaveConnectedState(NodeId nNodeId) {
389 LOG.info("netconfNode id {}", nNodeId);
390 netconfNodeConnectListenerList.forEach(item -> {
393 item.onLeaveConnected(nNodeId);
395 LOG.warn("Unexpeced null item during onleave");
397 } catch (Exception e) {
398 LOG.info("Exception during onLeaveConnected listener call", e);
403 // ---- subclasses for listeners
406 * Clustered listener function to select the right node from
407 * DataObjectModification
409 private class L1 implements ClusteredDataTreeChangeListener<Node> {
411 public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
412 LOG.info("L1 TreeChange, changes:{}", changes.size());
414 for (final DataTreeModification<Node> change : changes) {
416 final DataObjectModification<Node> root = change.getRootNode();
417 if (LOG.isTraceEnabled()) {
418 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
419 change.getRootPath(), root);
422 // Catch potential nullpointer exceptions ..
424 ModificationType modificationTyp = root.getModificationType();
425 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore()
426 : root.getDataAfter();
427 NodeId nodeId = node != null ? node.getNodeId() : null;
428 if (nodeId != null) {
429 if (nodeId.equals(CONTROLLER)) {
430 // Do not forward any controller related events to devicemanager
431 LOG.debug("Stop processing for [{}]", nodeId);
433 if (modificationTyp != null) {
434 switch (modificationTyp) {
435 case SUBTREE_MODIFIED: // Create or modify sub level node
436 case WRITE: // Create or modify top level node
437 // Treat an overwrite as an update
438 // leaveconnected state.before = connected; state.after != connected
439 // enterConnected state.after == connected
440 // => Here create or update by checking root.getDataBefore() != null
442 boolean connectedBefore, connectedAfter;
443 NetconfNode nNodeAfter = getNetconfNode(root.getDataAfter());
444 connectedAfter = isConnected(nNodeAfter);
445 if (root.getDataBefore() != null) {
447 NetconfNode nodeBefore = getNetconfNode(root.getDataBefore());
448 connectedBefore = isConnected(nodeBefore);
451 connectedBefore = false;
455 "L1 NETCONF Node change with id {} ConnectedBefore {} connectedAfter {} cluster status {} akkaIsCluster",
456 nodeId, connectedBefore, connectedAfter,
457 getClusteredConnectionStatus(nNodeAfter), isCluster);
459 if (!connectedBefore && connectedAfter) {
460 netconfNodeStateListenerList.forEach(item -> {
462 item.onCreated(nodeId, nNodeAfter);
463 } catch (Exception e) {
464 LOG.info("Exception during onCreated listener call", e);
467 enterConnectedState(nodeId, nNodeAfter);
469 LOG.debug("State change {} {}", connectedBefore, connectedAfter);
470 if (connectedBefore && !connectedAfter) {
471 leaveConnectedState(nodeId);
473 netconfNodeStateListenerList.forEach(item -> {
475 item.onStateChange(nodeId, nNodeAfter);
476 } catch (Exception e) {
477 LOG.info("Exception during onStateChange listener call", e);
481 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
485 // leaveconnected state.before = connected;
486 leaveConnectedState(nodeId);
487 netconfNodeStateListenerList.forEach(item -> {
489 item.onRemoved(nodeId);
490 } catch (Exception e) {
491 LOG.info("Exception during onRemoved listener call", e);
494 // doProcessing(Action.REMOVE, nodeId, root);
500 } catch (NullPointerException e) {
501 LOG.info("Data not available at ", e);
507 private static @Nullable NetconfNode getNetconfNode(Node node) {
508 return node != null ? node.augmentation(NetconfNode.class) : null;
511 private static boolean isConnected(NetconfNode nNode) {
512 return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
515 private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
516 return node != null ? node.getClusteredConnectionStatus() : null;
519 * Normal listener function to select the right node from DataObjectModification
521 private class L2 implements DataTreeChangeListener<Node> {
524 public void onDataTreeChanged(@NonNull Collection<DataTreeModification<Node>> changes) {
525 LOG.info("L2 TreeChange, changes:{}", changes.size());
529 /* -- LOG related functions -- */
531 /** Analyze configuration **/
532 private static @Nullable AkkaConfig getAkkaConfig() {
533 AkkaConfig akkaConfig;
535 akkaConfig = AkkaConfig.load();
536 LOG.debug("akka.conf loaded: " + akkaConfig.toString());
537 } catch (Exception e1) {
539 LOG.warn("problem loading akka.conf: " + e1.getMessage());
541 if (akkaConfig != null && akkaConfig.isCluster()) {
542 LOG.info("cluster mode detected");
543 if (GeoConfig.fileExists()) {
545 LOG.debug("try to load geoconfig");
547 } catch (Exception err) {
548 LOG.warn("problem loading geoconfig: " + err.getMessage());
551 LOG.debug("no geoconfig file found");
554 LOG.info("single node mode detected");
559 private boolean isNetconfNodeMaster(NetconfNode nNode) {
560 if (this.isCluster) {
561 LOG.debug("check if me is responsible for node");
562 ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
563 @SuppressWarnings("null")
564 @NonNull String masterNodeName = ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
565 LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + this.clusterName);
566 if (!masterNodeName.equals(this.clusterName)) {
567 LOG.debug("netconf change but me is not master for this node");
575 private void sleepMs(int milliseconds) {
577 Thread.sleep(milliseconds);
578 } catch (InterruptedException e) {
579 LOG.debug("Interrupted sleep");
580 // Restore interrupted state...
581 Thread.currentThread().interrupt();