Improve Websocket notification interface 33/121533/3
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>
Tue, 25 May 2021 13:27:29 +0000 (18:57 +0530)
committerRavi Pendurty <ravi.pendurty@highstreet-technologies.com>
Mon, 14 Jun 2021 04:52:54 +0000 (10:22 +0530)
Improve websocket notification interface

Issue-ID: CCSDK-3315
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: I0ded865adddb546ade98df4760e0a32ec964295a
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
31 files changed:
sdnr/wt/common-yang/utils/src/main/java/org/onap/ccsdk/features/sdnr/wt/yang/mapper/YangToolsMapperHelper.java
sdnr/wt/devicemanager-core/provider/pom.xml
sdnr/wt/devicemanager-core/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/eventdatahandler/DeviceManagerDatabaseNotificationService.java
sdnr/wt/devicemanager-core/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/eventdatahandler/ODLEventListenerHandler.java
sdnr/wt/devicemanager-core/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/eventdatahandler/RpcPushNotificationsHandler.java
sdnr/wt/devicemanager-core/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/impl/xml/WebSocketServiceClientImpl.java
sdnr/wt/devicemanager-core/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/impl/xml/WebSocketServiceClientInternal.java
sdnr/wt/devicemanager-onap/onf12/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf/ifpac/microwave/WrapperMicrowaveModelRev170324.java
sdnr/wt/devicemanager-onap/onf12/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf/ifpac/microwave/WrapperMicrowaveModelRev180907.java
sdnr/wt/devicemanager-onap/onf12/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf/ifpac/microwave/WrapperMicrowaveModelRev181010.java
sdnr/wt/devicemanager-onap/onf14/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf14/impl/interfaces/Onf14AirInterfaceNotificationListener.java
sdnr/wt/devicemanager-onap/onf14/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf14/impl/interfaces/Onf14EthernetContainerNotificationListener.java
sdnr/wt/devicemanager-onap/onf14/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/onf14/impl/interfaces/Onf14WireInterfaceNotificationListener.java
sdnr/wt/devicemanager-onap/openroadm/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/openroadm/impl/OpenroadmChangeNotificationListener.java
sdnr/wt/devicemanager-onap/openroadm/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/openroadm/impl/OpenroadmDeviceChangeNotificationListener.java
sdnr/wt/devicemanager-onap/openroadm/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/openroadm/impl/OpenroadmFaultNotificationListener.java
sdnr/wt/devicemanager-onap/openroadm/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/openroadm/impl/OpenroadmNetworkElement.java
sdnr/wt/devicemanager-onap/openroadm/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/devicemanager/openroadm/test/TestOpenRoadmAlarmNotification.java
sdnr/wt/websocketmanager/installer/pom.xml
sdnr/wt/websocketmanager/model/pom.xml
sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/WebsocketManagerService.java
sdnr/wt/websocketmanager/model/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/model/data/SchemaInfo.java
sdnr/wt/websocketmanager/provider/pom.xml
sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerProvider.java
sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/WebSocketManagerSocket.java
sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java [deleted file]
sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java [new file with mode: 0644]
sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/AkkaConfigTest.java
sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/RateFilterTest.java
sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java [new file with mode: 0644]
sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/WebsockerProviderTest.java

index ac6b1b5..3086167 100644 (file)
@@ -219,7 +219,7 @@ public class YangToolsMapperHelper {
                 return true;
             }
         }
-        return false;
+        return ifToImplement.isAssignableFrom(clz);
     }
 
     /**
@@ -235,7 +235,12 @@ public class YangToolsMapperHelper {
         final StringBuilder ret = new StringBuilder(name.length());
         if (!name.startsWith("_"))
             ret.append('_');
+        ret.append(toCamelCase(name));
+        return ret.toString();
+    }
+    public static String toCamelCase(final String name) {
         int start = 0;
+        final StringBuilder ret = new StringBuilder(name.length());
         for (final String word : name.split("-")) {
             if (!word.isEmpty()) {
                 if (start++ == 0) {
@@ -248,7 +253,10 @@ public class YangToolsMapperHelper {
         }
         return ret.toString();
     }
-
+    public static String toCamelCaseClassName(final String name) {
+        final String clsName = toCamelCase(name);
+        return clsName.substring(0,1).toUpperCase()+clsName.substring(1);
+    }
     private static BundleContext getBundleContext() {
         Bundle bundle = FrameworkUtil.getBundle(YangToolsMapperHelper.class);
         return bundle != null ? bundle.getBundleContext() : null;
index 575987d..d37a4a9 100644 (file)
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>com.typesafe</groupId>
-            <artifactId>config</artifactId>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.13</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-cluster_2.13</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>javax.xml.bind</groupId>
             <artifactId>jaxb-api</artifactId>
             <scope>provided</scope>
+        </dependency>
+               <dependency>
+            <groupId>jakarta.activation</groupId>
+            <artifactId>jakarta.activation-api</artifactId>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.glassfish.jaxb</groupId>
index 830969a..dc57626 100644 (file)
@@ -99,7 +99,7 @@ public class DeviceManagerDatabaseNotificationService implements NotificationSer
                 .setAttributeName(eventNotification.getAttributeName()).setCounter(eventNotification.getCounter())
                 .setNewValue(eventNotification.getNewValue()).setObjectIdRef(eventNotification.getObjectId())
                 .setTimeStamp(eventNotification.getTimestamp()).build();
-        this.webSocketService.sendViaWebsockets(nodeId, notification, AttributeValueChangedNotification.QNAME,
+        this.webSocketService.sendViaWebsockets(new NodeId(nodeId), notification, AttributeValueChangedNotification.QNAME,
                 eventNotification.getTimestamp());
     }
 
@@ -119,7 +119,7 @@ public class DeviceManagerDatabaseNotificationService implements NotificationSer
         databaseService.writeEventLog(eventlogEntity);
         ObjectCreationNotification notification = new ObjectCreationNotificationBuilder().setCounter(counter)
                 .setObjectIdRef(objectId).setTimeStamp(eventlogEntity.getTimestamp()).build();
-        this.webSocketService.sendViaWebsockets(nodeId.getValue(), notification, ObjectCreationNotification.QNAME,
+        this.webSocketService.sendViaWebsockets(nodeId, notification, ObjectCreationNotification.QNAME,
                 eventlogEntity.getTimestamp());
     }
 
@@ -132,7 +132,7 @@ public class DeviceManagerDatabaseNotificationService implements NotificationSer
         databaseService.writeEventLog(eventlogEntity);
         ObjectDeletionNotification notification = new ObjectDeletionNotificationBuilder().setCounter(counter)
                 .setObjectIdRef(objectId).setTimeStamp(eventlogEntity.getTimestamp()).build();
-        this.webSocketService.sendViaWebsockets(nodeId.getValue(), notification, ObjectDeletionNotification.QNAME,
+        this.webSocketService.sendViaWebsockets(nodeId, notification, ObjectDeletionNotification.QNAME,
                 eventlogEntity.getTimestamp());
     }
 
@@ -161,12 +161,13 @@ public class DeviceManagerDatabaseNotificationService implements NotificationSer
         } else {
             this.pushAlarmIfNotInMaintenance(nodeName, notificationXml);
         }
+        // Send
         ProblemNotification notification = new ProblemNotificationBuilder().setCounter(faultNotification.getCounter())
                 .setObjectIdRef(faultNotification.getObjectId()).setTimeStamp(faultNotification.getTimestamp())
                 .setProblem(faultNotification.getProblem())
                 .setSeverity(InternalSeverity.toYang(faultNotification.getSeverity())).build();
-        this.webSocketService.sendViaWebsockets(faultNotification.getNodeId(), notification,
-                ObjectDeletionNotification.QNAME, faultNotification.getTimestamp());
+        this.webSocketService.sendViaWebsockets(new NodeId(faultNotification.getNodeId()), notification,
+                ProblemNotification.QNAME, faultNotification.getTimestamp());
     }
 
     private void pushAlarmIfNotInMaintenance(String nodeName, ProblemNotificationXml notificationXml) {
index d031d25..66fcc05 100644 (file)
@@ -48,6 +48,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicema
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectDeletionNotificationBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotificationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,7 +132,7 @@ public class ODLEventListenerHandler implements EventHandlingService, AutoClosea
         // Write first to prevent missing entries
         databaseService.updateNetworkConnection22(e, registrationName);
         databaseService.writeConnectionLog(log);
-        webSocketService.sendViaWebsockets(ownKeyName, notification, ObjectCreationNotification.QNAME,
+        webSocketService.sendViaWebsockets(new NodeId(ownKeyName), notification, ObjectCreationNotification.QNAME,
                 NetconfTimeStampImpl.getConverter().getTimeStamp());
     }
 
@@ -156,7 +157,7 @@ public class ODLEventListenerHandler implements EventHandlingService, AutoClosea
         AttributeValueChangedNotification notification = new AttributeValueChangedNotificationBuilder()
                 .setCounter(popEvntNumber()).setTimeStamp(ts).setObjectIdRef(mountpointNodeName)
                 .setAttributeName("deviceType").setNewValue(deviceType.name()).build();
-        webSocketService.sendViaWebsockets(ownKeyName, notification, AttributeValueChangedNotification.QNAME, ts);
+        webSocketService.sendViaWebsockets(new NodeId(ownKeyName), notification, AttributeValueChangedNotification.QNAME, ts);
     }
 
     /**
@@ -190,7 +191,7 @@ public class ODLEventListenerHandler implements EventHandlingService, AutoClosea
         // Write first to prevent missing entries
         databaseService.removeNetworkConnection(registrationName);
         databaseService.writeConnectionLog(log);
-        webSocketService.sendViaWebsockets(registrationName, notification, ObjectDeletionNotification.QNAME, ts);
+        webSocketService.sendViaWebsockets(new NodeId(registrationName), notification, ObjectDeletionNotification.QNAME, ts);
 
     }
 
@@ -217,7 +218,7 @@ public class ODLEventListenerHandler implements EventHandlingService, AutoClosea
             this.updateNeConnectionRetryWithDelay(nNode, registrationName);
         }
         databaseService.writeConnectionLog(log);
-        webSocketService.sendViaWebsockets(ownKeyName, notification, AttributeValueChangedNotification.QNAME, ts);
+        webSocketService.sendViaWebsockets(new NodeId(ownKeyName), notification, AttributeValueChangedNotification.QNAME, ts);
     }
 
 
@@ -272,7 +273,7 @@ public class ODLEventListenerHandler implements EventHandlingService, AutoClosea
 
         aotsDcaeForwarder.sendProblemNotificationUsingMaintenanceFilter(ownKeyName, notificationXml);
 
-        webSocketService.sendViaWebsockets(ownKeyName, notification, ProblemNotification.QNAME, ts);
+        webSocketService.sendViaWebsockets(new NodeId(ownKeyName), notification, ProblemNotification.QNAME, ts);
     }
 
     @Override
index 57f258b..273231a 100644 (file)
@@ -42,6 +42,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicema
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotificationBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.PushAttributeChangeNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.PushFaultNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,7 @@ public class RpcPushNotificationsHandler implements PushNotifications {
                 new AttributeValueChangedNotificationBuilder().setAttributeName(input.getAttributeName())
                         .setCounter(input.getCounter()).setNewValue(input.getNewValue())
                         .setObjectIdRef(input.getObjectId()).setTimeStamp(input.getTimestamp()).build();
-        webSocketService.sendViaWebsockets(OWNKEYNAME, notification, AttributeValueChangedNotification.QNAME,
+        webSocketService.sendViaWebsockets(new NodeId(input.getNodeId()!=null?input.getNodeId():OWNKEYNAME), notification, AttributeValueChangedNotification.QNAME,
                 input.getTimestamp());
 
     }
@@ -102,7 +103,7 @@ public class RpcPushNotificationsHandler implements PushNotifications {
                 .setCounter(input.getCounter()).setObjectIdRef(input.getObjectId())
                 .setSeverity(InternalSeverity.toYang(input.getSeverity())).setTimeStamp(input.getTimestamp()).build();
         aotsDcaeForwarder.sendProblemNotificationUsingMaintenanceFilter(OWNKEYNAME, notificationXml);
-        webSocketService.sendViaWebsockets(OWNKEYNAME, notification, ProblemNotification.QNAME, input.getTimestamp());
+        webSocketService.sendViaWebsockets(new NodeId(input.getNodeId()!=null?input.getNodeId():OWNKEYNAME), notification, ProblemNotification.QNAME, input.getTimestamp());
     }
 
 }
index 6cf984b..30df8a9 100644 (file)
@@ -20,6 +20,7 @@ package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml;
 import org.eclipse.jdt.annotation.NonNull;
 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.WebsocketManagerService;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.QName;
 
@@ -50,9 +51,9 @@ public class WebSocketServiceClientImpl implements WebSocketServiceClientInterna
 
 
     @Override
-    public void sendViaWebsockets(@NonNull String nodeName, Notification notification, QName qname,
+    public void sendViaWebsockets(@NonNull NodeId nodeId, Notification notification, QName qname,
             DateAndTime timestamp) {
-        this.websocketmanagerService.sendNotification(notification, nodeName, qname, timestamp);
+        this.websocketmanagerService.sendNotification(notification, nodeId, qname, timestamp);
 
     }
 
index 83fbd5c..53be1dc 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.ccsdk.features.sdnr.wt.devicemanager.impl.xml;
 
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.QName;
 
@@ -32,6 +33,6 @@ import org.opendaylight.yangtools.yang.common.QName;
  */
 public interface WebSocketServiceClientInternal extends AutoCloseable {
 
-    public void sendViaWebsockets(@NonNull String nodeName,Notification notification, QName qname, DateAndTime timestamp);
+    public void sendViaWebsockets(@NonNull NodeId nodeId,Notification notification, QName qname, DateAndTime timestamp);
 
 }
index 6b32180..9ed0276 100644 (file)
@@ -206,9 +206,11 @@ public class WrapperMicrowaveModelRev170324 implements OnfMicrowaveModel, Microw
     public void onObjectCreationNotification(ObjectCreationNotification notification) {
         LOG.debug("Got event of type :: {}", ObjectCreationNotification.class.getSimpleName());
         if (notification != null) {
+            // Send devicemanager specific notification for database and ODLUX
             microwaveModelListener.creationNotification(acessor.getNodeId(), notification.getCounter(),
                     notification.getTimeStamp(), Helper.nnGetUniversalId(notification.getObjectIdRef()).getValue());
-            notificationService.sendNotification(notification, acessor.getNodeId().getValue(),
+            // Send model specific notification to WebSocketManager
+            notificationService.sendNotification(notification, acessor.getNodeId(),
                     ObjectCreationNotification.QNAME, notification.getTimeStamp());
         }
     }
@@ -217,9 +219,11 @@ public class WrapperMicrowaveModelRev170324 implements OnfMicrowaveModel, Microw
     public void onObjectDeletionNotification(ObjectDeletionNotification notification) {
         LOG.debug("Got event of type :: {}", ObjectDeletionNotification.class.getSimpleName());
         if (notification != null) {
+            // Send devicemanager specific notification for database and ODLUX
             microwaveModelListener.deletionNotification(acessor.getNodeId(), notification.getCounter(),
                     notification.getTimeStamp(), Helper.nnGetUniversalId(notification.getObjectIdRef()).getValue());
-            notificationService.sendNotification(notification, acessor.getNodeId().getValue(),
+            // Send model specific notification to WebSocketManager
+            notificationService.sendNotification(notification, acessor.getNodeId(),
                     ObjectDeletionNotification.QNAME, notification.getTimeStamp());
         }
     }
@@ -231,8 +235,10 @@ public class WrapperMicrowaveModelRev170324 implements OnfMicrowaveModel, Microw
                 .setCounter(notification.getCounter()).setTimestamp(notification.getTimeStamp())
                 .setObjectId(Helper.nnGetUniversalId(notification.getObjectIdRef()).getValue())
                 .setAttributeName(notification.getAttributeName()).setNewValue(notification.getNewValue()).build();
+        // Send devicemanager specific notification for database and ODLUX
         microwaveModelListener.eventNotification(beventlogEntity);
-        notificationService.sendNotification(notification, acessor.getNodeId().getValue(),
+        // Send model specific notification to WebSocketManager
+        notificationService.sendNotification(notification, acessor.getNodeId(),
                 AttributeValueChangedNotification.QNAME, notification.getTimeStamp());
         if (notificationQueue.isPresent()) {
             notificationQueue.get().put(beventlogEntity);
@@ -249,8 +255,10 @@ public class WrapperMicrowaveModelRev170324 implements OnfMicrowaveModel, Microw
                 .setNodeId(this.acessor.getNodeId().getValue())
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter())
                 .build();
+        // Send devicemanager specific notification for database and ODLUX
         faultService.faultNotification(faultAlarm);
-        notificationService.sendNotification(notification, acessor.getNodeId().getValue(), ProblemNotification.QNAME,
+        // Send model specific notification to WebSocketManager
+        notificationService.sendNotification(notification, acessor.getNodeId(), ProblemNotification.QNAME,
                 notification.getTimeStamp());
     }
 
index 04d72e1..e564881 100644 (file)
@@ -265,8 +265,10 @@ public class WrapperMicrowaveModelRev180907 implements OnfMicrowaveModel, Microw
                 .setNodeId(this.acessor.getNodeId().getValue())
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter())
                 .build();
+        // Send devicemanager specific notification for database and ODLUX
         faultService.faultNotification(faultAlarm);
-        notificationService.sendNotification(notification, acessor.getNodeId().getValue(), ProblemNotification.QNAME,
+        // Send model specific notification to WebSocketManager
+        notificationService.sendNotification(notification, acessor.getNodeId(), ProblemNotification.QNAME,
                 notification.getTimeStamp());
     }
 
index 623fa62..33349ef 100644 (file)
@@ -260,8 +260,10 @@ public class WrapperMicrowaveModelRev181010 implements OnfMicrowaveModel, Microw
                 .setNodeId(this.acessor.getNodeId().getValue())
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter())
                 .build();
+        // Send devicemanager specific notification for database and ODLUX
         faultService.faultNotification(faultAlarm);
-        notificationService.sendNotification(notification, acessor.getNodeId().getValue(), ProblemNotification.QNAME,
+        // Send model specific notification to WebSocketManager
+        notificationService.sendNotification(notification, acessor.getNodeId(), ProblemNotification.QNAME,
                 notification.getTimeStamp());
     }
 
index e1ca580..1f3f309 100644 (file)
@@ -83,7 +83,7 @@ public class Onf14AirInterfaceNotificationListener implements AirInterface20List
                 .setObjectId(notification.getObjectIdRef().getValue()).setSourceType(SourceType.Netconf)
                 .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ObjectDeletionNotification.QNAME, notification.getTimestamp());
 
         log.debug("onObjectDeletionNotification log entry written");
@@ -99,7 +99,7 @@ public class Onf14AirInterfaceNotificationListener implements AirInterface20List
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter())
                 .build();
         serviceProvider.getFaultService().faultNotification(faultAlarm);
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ProblemNotification.QNAME, notification.getTimestamp());
 
     }
@@ -118,7 +118,7 @@ public class Onf14AirInterfaceNotificationListener implements AirInterface20List
                 .setNewValue(notification.getNewValue()).setObjectId(notification.getObjectIdRef().getValue())
                 .setSourceType(SourceType.Netconf).setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 AttributeValueChangedNotification.QNAME, notification.getTimestamp());
         log.debug("onAttributeValueChangedNotification log entry written");
     }
@@ -133,7 +133,7 @@ public class Onf14AirInterfaceNotificationListener implements AirInterface20List
                 .setObjectId(notification.getObjectIdRef().getValue()).setSourceType(SourceType.Netconf)
                 .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ObjectCreationNotification.QNAME, notification.getTimestamp());
         log.debug("onObjectCreationNotification log entry written");
     }
index fb67a24..b76572b 100644 (file)
@@ -96,7 +96,7 @@ public class Onf14EthernetContainerNotificationListener implements EthernetConta
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter().intValue())
                 .build();
         serviceProvider.getFaultService().faultNotification(faultAlarm);
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ProblemNotification.QNAME, notification.getTimestamp());
 
     }
@@ -115,7 +115,7 @@ public class Onf14EthernetContainerNotificationListener implements EthernetConta
                 .setNewValue(notification.getNewValue()).setObjectId(notification.getObjectIdRef().getValue())
                 .setSourceType(SourceType.Netconf).setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 AttributeValueChangedNotification.QNAME, notification.getTimestamp());
 
         log.debug("onAttributeValueChangedNotification log entry written");
@@ -131,7 +131,7 @@ public class Onf14EthernetContainerNotificationListener implements EthernetConta
                 .setObjectId(notification.getObjectIdRef().getValue()).setSourceType(SourceType.Netconf)
                 .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ObjectCreationNotification.QNAME, notification.getTimestamp());
 
         log.debug("onObjectCreationNotification log entry written");
index 8a1d42f..840f2c3 100644 (file)
@@ -83,7 +83,7 @@ public class Onf14WireInterfaceNotificationListener implements WireInterface20Li
         .setSourceType(SourceType.Netconf)
         .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ObjectDeletionNotification.QNAME, notification.getTimestamp());
 
         log.debug("onObjectDeletionNotification log entry written");
@@ -98,7 +98,7 @@ public class Onf14WireInterfaceNotificationListener implements WireInterface20Li
                 .setSeverity(mapSeverity(notification.getSeverity())).setCounter(notification.getCounter().intValue())
                 .build();
         serviceProvider.getFaultService().faultNotification(faultAlarm);
-        serviceProvider.getWebsocketService().sendNotification( notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification( notification, netconfAccessor.getNodeId(),
                 ProblemNotification.QNAME, notification.getTimestamp());
 
     }
@@ -120,7 +120,7 @@ public class Onf14WireInterfaceNotificationListener implements WireInterface20Li
         .setSourceType(SourceType.Netconf)
         .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 AttributeValueChangedNotification.QNAME, notification.getTimestamp());
 
 
@@ -140,7 +140,7 @@ public class Onf14WireInterfaceNotificationListener implements WireInterface20Li
         .setSourceType(SourceType.Netconf)
         .setTimestamp(notification.getTimestamp());
         serviceProvider.getDataProvider().writeEventLog(eventlogBuilder.build());
-        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId().getValue(),
+        serviceProvider.getWebsocketService().sendNotification(notification, netconfAccessor.getNodeId(),
                 ObjectCreationNotification.QNAME, notification.getTimestamp());
 
         log.debug("onObjectCreationNotification log entry written");
index 3af6d7a..3b7f8b0 100644 (file)
@@ -68,14 +68,14 @@ public class OpenroadmChangeNotificationListener implements IetfNetconfNotificat
     @Override
     public void onNetconfConfirmedCommit(NetconfConfirmedCommit notification) {
         log.info("onNetconfConfirmedCommit {} ", notification);
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 NetconfConfirmedCommit.QNAME, NetconfTimeStampImpl.getConverter().getTimeStamp());
     }
 
     @Override
     public void onNetconfSessionStart(NetconfSessionStart notification) {
         log.info("onNetconfSessionStart {} ", notification);
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 NetconfSessionStart.QNAME, NetconfTimeStampImpl.getConverter().getTimeStamp());
 
     }
@@ -83,14 +83,14 @@ public class OpenroadmChangeNotificationListener implements IetfNetconfNotificat
     @Override
     public void onNetconfSessionEnd(NetconfSessionEnd notification) {
         log.info("onNetconfSessionEnd {}", notification);
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 NetconfSessionEnd.QNAME, NetconfTimeStampImpl.getConverter().getTimeStamp());
     }
 
     @Override
     public void onNetconfCapabilityChange(NetconfCapabilityChange notification) {
         log.info("onNetconfCapabilityChange {}", notification);
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 NetconfCapabilityChange.QNAME, NetconfTimeStampImpl.getConverter().getTimeStamp());
     }
 
@@ -120,7 +120,7 @@ public class OpenroadmChangeNotificationListener implements IetfNetconfNotificat
             databaseService.writeEventLog(eventlogBuilder.build());
         }
         log.info("onNetconfConfigChange (2) {}", sb);
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 NetconfConfigChange.QNAME, NetconfTimeStampImpl.getConverter().getTimeStamp());
 
     }
index daea1ad..a39e62c 100644 (file)
@@ -104,7 +104,7 @@ public class OpenroadmDeviceChangeNotificationListener implements OrgOpenroadmDe
             log.info("onDeviceConfigChange (2) {}", sb);
             counter++;
         }
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 ChangeNotification.QNAME, notification.getChangeTime());
     }
 
@@ -119,7 +119,7 @@ public class OpenroadmDeviceChangeNotificationListener implements OrgOpenroadmDe
                 .setCounter(counter).setNewValue(notification.getStatus().getName()).setSourceType(SourceType.Netconf)
                 .setTimestamp(now);
         databaseProvider.writeEventLog(eventlogBuilder.build());
-        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId().getValue(),
+        this.notificationServiceService.sendNotification(notification, this.netconfAccessor.getNodeId(),
                 CreateTechInfoNotification.QNAME, now);
         log.info("Create-techInfo Notification written ");
         counter++;
index 10d0a56..23bb24c 100644 (file)
@@ -25,12 +25,34 @@ package org.onap.ccsdk.features.sdnr.wt.devicemanager.openroadm.impl;
 import org.eclipse.jdt.annotation.NonNull;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.FaultService;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor;
 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.WebsocketManagerService;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev191129.AlarmNotification;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev191129.OrgOpenroadmAlarmListener;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev191129.alarm.ProbableCause;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.probablecause.rev191129.ProbableCauseEnum;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.Resource;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.CircuitPack;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Connection;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Degree;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Device;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Interface;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.InternalLink;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.LineAmplifier;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.OduSncpPg;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Other;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.PhysicalLink;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Port;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Service;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Shelf;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Srg;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.TempService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.VersionedService;
+import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.Xponder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.FaultlogBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.FaultlogEntity;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SourceType;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +68,11 @@ public class OpenroadmFaultNotificationListener implements OrgOpenroadmAlarmList
     private @NonNull WebsocketManagerService notificationService;
     private Integer count = 1;
 
+    private NetconfBindingAccessor netconfAccessor;
 
-    public OpenroadmFaultNotificationListener(DeviceManagerServiceProvider serviceProvider) {
+
+    public OpenroadmFaultNotificationListener(NetconfBindingAccessor accessor, DeviceManagerServiceProvider serviceProvider) {
+        this.netconfAccessor = accessor;
         this.faultEventListener = serviceProvider.getFaultService();
         this.notificationService = serviceProvider.getWebsocketService();
 
@@ -55,24 +80,80 @@ public class OpenroadmFaultNotificationListener implements OrgOpenroadmAlarmList
 
     @Override
     public void onAlarmNotification(AlarmNotification notification) {
+        log.info("AlarmNotification is {} \t {}", notification.getId(), notification.getAdditionalDetail());
+        String affectedResourceName = getAffectedResourceName(notification.getResource().getResource().getResource());
+        String probableCauseName = getProbableCauseName(notification.getProbableCause());
 
-
-        log.info("AlarmNotification {} \t {}", notification.getId(), notification.getAdditionalDetail());
-        final String nodeId = notification.getResource().getDevice().getNodeId().getValue();
-        FaultlogEntity faultAlarm = new FaultlogBuilder().setObjectId(notification.getCircuitId())
-                .setProblem(notification.getProbableCause().getCause().getName()).setSourceType(SourceType.Netconf)
-                .setTimestamp(notification.getRaiseTime()).setId(notification.getId()).setNodeId(nodeId)
+        if (notification.getId() == null) {
+            log.warn("Alarm ID is null. Not logging alarm information to the DB. Alarm ID should not be null. Please fix the same in the Device");
+            return;
+        }
+        FaultlogEntity faultAlarm = new FaultlogBuilder().setObjectId(affectedResourceName)
+                .setProblem(probableCauseName).setSourceType(SourceType.Netconf)
+                .setTimestamp(notification.getRaiseTime()).setId(notification.getId()).setNodeId(netconfAccessor.getNodeId().getValue())
                 .setSeverity(InitialDeviceAlarmReader.checkSeverityValue(notification.getSeverity())).setCounter(count)
                 .build();
 
         this.faultEventListener.faultNotification(faultAlarm);
-        this.notificationService.sendNotification(notification, nodeId, AlarmNotification.QNAME,
+        this.notificationService.sendNotification(notification,new NodeId(netconfAccessor.getNodeId().getValue()), AlarmNotification.QNAME,
                 notification.getRaiseTime());
         count++;
         log.info("Notification is written into the database {}", faultAlarm.getObjectId());
 
     }
 
+    public String getAffectedResourceName(Resource affectedResource) {
+        if (affectedResource instanceof CircuitPack) {
+            return ((CircuitPack)affectedResource).getCircuitPackName();
+        } else if (affectedResource instanceof Port) {
+            return ((Port)affectedResource).getPort().getPortName();
+        } else if (affectedResource instanceof Connection) {
+            return ((Connection)affectedResource).getConnectionName();
+        } else if (affectedResource instanceof PhysicalLink) {
+            return ((PhysicalLink)affectedResource).getPhysicalLinkName();
+        } else if (affectedResource instanceof InternalLink) {
+            return ((InternalLink)affectedResource).getInternalLinkName();
+        } else if (affectedResource instanceof Shelf) {
+            return ((Shelf)affectedResource).getShelfName();
+        } else if (affectedResource instanceof Srg) {
+            return "SRG #- " + ((Srg)affectedResource).getSrgNumber().toString();
+        } else if (affectedResource instanceof Degree) {
+            return "Degree - " + ((Degree)affectedResource).getDegreeNumber().toString();
+        } else if (affectedResource instanceof Service) {
+            return ((Service)affectedResource).getServiceName();
+        } else if (affectedResource instanceof Interface) {
+            return ((Interface)affectedResource).getInterfaceName();
+        } else if (affectedResource instanceof OduSncpPg) {
+            return ((OduSncpPg)affectedResource).getOduSncpPgName();
+        } else if (affectedResource instanceof Device) {
+            return ((Device)affectedResource).getNodeId().getValue();
+        } else if (affectedResource instanceof LineAmplifier) {
+            return "LineAmplifier # - " + ((LineAmplifier)affectedResource).getAmpNumber().toString();
+        } else if (affectedResource instanceof Xponder) {
+            return "Xponder # - "+ ((Xponder)affectedResource).getXpdrNumber().toString();
+        } else if (affectedResource instanceof Other) {
+            return ((Other)affectedResource).getOtherResourceId();
+        } else if (affectedResource instanceof VersionedService) {
+            return ((VersionedService)affectedResource).getVersionedServiceName();
+        } else if (affectedResource instanceof TempService) {
+            return ((TempService)affectedResource).getCommonId();
+        }
 
+        log.warn("Unknown Resource {} received from Notification", affectedResource.getClass().getSimpleName());
+        return "Unknown Resource";
+    }
+
+    public String getProbableCauseName(ProbableCause probableCause) {
+        if (probableCause != null) {
+            ProbableCauseEnum pce = probableCause.getCause();
+            if (pce != null) {
+                return pce.getName();
+            }
+            log.warn("ProbableCauseEnum is NULL");
+            return "Unknown Cause";
+        }
+        log.warn("ProbableCause is NULL");
+        return "Unknown Cause";
+    }
 
 }
index 02f8547..cc406d8 100644 (file)
@@ -90,7 +90,7 @@ public class OpenroadmNetworkElement extends OpenroadmNetworkElementBase {
         this.openRdmListener = new OpenroadmChangeNotificationListener(netconfAccessor, databaseService,
                 serviceProvider.getWebsocketService());
         this.opnRdmFaultListenerRegistrationResult = null;
-        this.opnRdmFaultListener = new OpenroadmFaultNotificationListener(serviceProvider);
+        this.opnRdmFaultListener = new OpenroadmFaultNotificationListener(netconfAccessor, serviceProvider);
         this.opnRdmDeviceListenerRegistrationResult = null;
         this.opnRdmDeviceListener = new OpenroadmDeviceChangeNotificationListener(netconfAccessor, databaseService,
                 serviceProvider.getWebsocketService());
index ff0ddb4..cd54b6c 100644 (file)
@@ -31,7 +31,7 @@ import org.junit.Test;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.openroadm.impl.OpenroadmFaultNotificationListener;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.FaultService;
-import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfBindingAccessor;
 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.WebsocketManagerService;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev191129.AlarmNotification;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.alarm.rev191129.Severity;
@@ -44,6 +44,7 @@ import org.opendaylight.yang.gen.v1.http.org.openroadm.probablecause.rev191129.P
 import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.Device;
 import org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.DeviceBuilder;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 
 public class TestOpenRoadmAlarmNotification {
     private static final String myCircuitId = "Test_Id";
@@ -52,18 +53,25 @@ public class TestOpenRoadmAlarmNotification {
     ProbableCause myProbableCause =
             new ProbableCauseBuilder().setCause(ProbableCauseEnum.AutomaticLaserShutdown).build();
     Device device = new DeviceBuilder().setNodeId(NodeIdType.getDefaultInstance("zNhe2i5")).build();
-    Resource myResource = new ResourceBuilder().setDevice(device).build();
+    org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.CircuitPack resVal =
+            new org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.resource.resource.CircuitPackBuilder()
+            .setCircuitPackName("Slot-0-Port-A").build();
+    org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.Resource affectedResource =
+            new org.opendaylight.yang.gen.v1.http.org.openroadm.resource.rev191129.resource.ResourceBuilder().setResource(resVal).build();
+    Resource myResource = new ResourceBuilder().setResource(affectedResource).setDevice(device).build();
+
+
     static DeviceManagerServiceProvider serviceProvider;
     static @NonNull FaultService faultService;
     static AlarmNotification notification;
     Severity severity;
-    static NetconfAccessor accessor;
+    static NetconfBindingAccessor accessor;
     static WebsocketManagerService notificationService;
-
+    static NodeId nNodeId = new NodeId("nSky");
     @BeforeClass
     public static void init() throws InterruptedException, IOException {
 
-        accessor = mock(NetconfAccessor.class);
+        accessor = mock(NetconfBindingAccessor.class);
         serviceProvider = mock(DeviceManagerServiceProvider.class);
         faultService = mock(FaultService.class);
         notificationService = mock(WebsocketManagerService.class);
@@ -74,10 +82,11 @@ public class TestOpenRoadmAlarmNotification {
     @Test
     public void testNotification() {
         severity = Severity.Critical;
+        when(accessor.getNodeId()).thenReturn(nNodeId);
         when(serviceProvider.getFaultService()).thenReturn(faultService);
         when(serviceProvider.getWebsocketService()).thenReturn(notificationService);
         OpenroadmFaultNotificationListener alarmListener =
-                new OpenroadmFaultNotificationListener(serviceProvider);
+                new OpenroadmFaultNotificationListener(accessor, serviceProvider);
         notification = mock(AlarmNotification.class);
 
         when(notification.getId()).thenReturn(myId);
index a13f064..661a9b3 100755 (executable)
@@ -47,7 +47,7 @@
 
     <dependencies>
         <dependency>
-            <groupId>org.onap.ccsdk.features.sdnr.wt</groupId>
+            <groupId>${project.groupId}</groupId>
             <artifactId>${application.name}-feature</artifactId>
             <version>${project.version}</version>
             <type>xml</type>
@@ -60,7 +60,7 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.onap.ccsdk.features.sdnr.wt</groupId>
+            <groupId>${project.groupId}</groupId>
             <artifactId>${application.name}-provider</artifactId>
             <version>${project.version}</version>
         </dependency>
index 7026b33..ec384d7 100644 (file)
             <artifactId>rfc6991-ietf-yang-types</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal.model</groupId>
+            <artifactId>ietf-topology</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
index bfceb37..305d745 100644 (file)
@@ -2,6 +2,7 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager.model;
 
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.QName;
 
@@ -57,7 +58,7 @@ public interface WebsocketManagerService {
      * @param nodeId
      * @param eventType
      */
-    void sendNotification(Notification notification, String nodeId, QName eventType);
+    void sendNotification(Notification notification, NodeId nodeId, QName eventType);
     /**
      * Send notification via Websocket to the connected clients.
      * @param notification
@@ -65,7 +66,7 @@ public interface WebsocketManagerService {
      * @param eventType
      * @param eventTime
      */
-    void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime);
+    void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime);
 
     /**
      * Send notification via Websocket to the connected clients.
@@ -73,7 +74,7 @@ public interface WebsocketManagerService {
      * @param nodeId
      * @param eventType
      */
-    void sendNotification(DOMNotification notification, String nodeId, QName eventType);
+    void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType);
     /**
      * Send notification via Websocket to the connected clients.
      * @param notification
@@ -81,7 +82,7 @@ public interface WebsocketManagerService {
      * @param eventType
      * @param eventTime
      */
-    void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime);
+    void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime);
 
 
 
index c587a79..4d39753 100644 (file)
@@ -36,12 +36,17 @@ public class SchemaInfo {
 
 
     public SchemaInfo(QName qname) {
-        this.namespace = qname.getNamespace().toString();
-        this.revision = qname.getRevision().isPresent() ? qname.getRevision().get().toString() : null;
-        this.notification = new ArrayList<>();
+        this(qname.getNamespace().toString(),
+                qname.getRevision().isPresent() ? qname.getRevision().get().toString() : null, new ArrayList<>());
         this.notification.add(qname.getLocalName());
     }
 
+    public SchemaInfo(String namespace, String revision, List<String> notifications) {
+        this.namespace = namespace;
+        this.revision = revision;
+        this.notification = notifications;
+    }
+
     public String getNamespace() {
         return namespace;
     }
@@ -66,6 +71,11 @@ public class SchemaInfo {
         this.notification = notification;
     }
 
+    /**
+     * SchemaInfo Validation restrictions: namespace!=null notification=null or if notification list set, then size>0
+     *
+     * @return
+     */
     @JsonIgnore
     public boolean isValid() {
         return this.namespace != null
@@ -74,6 +84,7 @@ public class SchemaInfo {
 
     /**
      * Check if schema(qname based info of notification) matches into this scope
+     *
      * @param schema
      * @return
      */
@@ -87,8 +98,8 @@ public class SchemaInfo {
         if (!this.namespace.equals(schema.getNamespace().toString())) {
             return false;
         }
-        //if revision of scope is set and it does not match => false
-        if (this.revision != null && !this.revision.equals(schema.getRevision())){
+        //if revision of scope is set and it does not match and is not '*' => false
+        if (this.revision != null && (!this.revision.equals(schema.getRevision()) && !this.revision.equals("*"))) {
             return false;
         }
         //if notification of scope is set and is current notification is not in the list
@@ -117,7 +128,7 @@ public class SchemaInfo {
 
     @JsonIgnore
     public void addNotification(String notification) {
-        if(this.notification ==null) {
+        if (this.notification == null) {
             this.notification = new ArrayList<>();
         }
         this.notification.add(notification);
index 0366ed2..fcdaa5e 100644 (file)
             <artifactId>sdnr-wt-websocketmanager-model</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal.model</groupId>
+            <artifactId>ietf-topology</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>sdnr-wt-yang-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
-            <artifactId>sdnr-wt-devicemanager-provider</artifactId>
+            <artifactId>sdnr-wt-devicemanager-core-provider</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
index 0b6e9b4..6100017 100644 (file)
@@ -24,6 +24,7 @@ import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.NotificationO
 import org.onap.ccsdk.features.sdnr.wt.yang.mapper.YangToolsMapperHelper;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.osgi.service.http.HttpService;
@@ -84,24 +85,44 @@ public class WebSocketManagerProvider implements WebsocketManagerService, AutoCl
 
 
     @Override
-    public void sendNotification(Notification notification, String nodeId, QName eventType) {
+    public void sendNotification(Notification notification, NodeId nodeId, QName eventType) {
+        if(!assertNotificationType(notification, eventType)){
+            return;
+        }
         this.sendNotification(notification, nodeId, eventType, YangToolsMapperHelper.getTime(notification,Instant.now()));
     }
 
+    public static boolean assertNotificationType(Notification notification, QName eventType) {
+        final String yangTypeName = eventType.getLocalName();
+        final Class<?> cls = notification.getClass();
+        final String clsNameToTest = YangToolsMapperHelper.toCamelCaseClassName(yangTypeName);
+        if(cls.getSimpleName().equals(clsNameToTest)) {
+            return true;
+        }
+        Class<?>[] ifs = cls.getInterfaces();
+        for(Class<?> clsif:ifs) {
+            if(clsif.getSimpleName().equals(clsNameToTest)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
     @Override
-    public void sendNotification(Notification notification, String nodeId, QName eventType, DateAndTime eventTime) {
-        WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId, eventType, eventTime));
+    public void sendNotification(Notification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
+        WebSocketManagerSocket.broadCast(new NotificationOutput(notification, nodeId.getValue(), eventType, eventTime));
 
     }
 
     @Override
-    public void sendNotification(DOMNotification notification, String nodeId, QName eventType) {
+    public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType) {
         LOG.warn("not yet implemented");
 
     }
 
     @Override
-    public void sendNotification(DOMNotification notification, String nodeId, QName eventType, DateAndTime eventTime) {
+    public void sendNotification(DOMNotification notification, NodeId nodeId, QName eventType, DateAndTime eventTime) {
         LOG.warn("not yet implemented");
 
     }
index 945de3c..a642bda 100644 (file)
@@ -25,6 +25,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.eclipse.jetty.websocket.api.Session;
@@ -41,7 +45,7 @@ import org.slf4j.LoggerFactory;
 
 public class WebSocketManagerSocket extends WebSocketAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class.getName());
+    private static final Logger LOG = LoggerFactory.getLogger(WebSocketManagerSocket.class);
     public static final String MSG_KEY_DATA = "data";
     public static final DataType MSG_KEY_SCOPES = DataType.scopes;
     public static final String MSG_KEY_PARAM = "param";
@@ -54,7 +58,47 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
     private static final Pattern PATTERN_SCOPEREGISTRATION =
             Pattern.compile(REGEX_SCOPEREGISTRATION, Pattern.MULTILINE);
     private static final Random RND = new Random();
+    private static final long SEND_MESSAGE_TIMEOUT_MILLIS = 1500;
+    private static final int QUEUE_SIZE = 100;
 
+    private final Thread sendingSyncThread;
+    private final ArrayBlockingQueue<String> messageQueue;
+    private boolean closed;
+
+    private final Runnable sendingRunner = new Runnable() {
+        @Override
+        public void run() {
+            LOG.debug("isrunning");
+            while (!closed) {
+                try {
+
+                    String message = messageQueue.poll();
+                    if (message != null) {
+                        WebSocketManagerSocket.this.session.getRemote().sendStringByFuture(message)
+                                .get(SEND_MESSAGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                        LOG.info("message sent");
+                    }
+                } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                    LOG.warn("problem pushing message: ", e);
+                }
+
+                if (messageQueue.isEmpty()) {
+                    trySleep(1000);
+                }
+
+            }
+            LOG.debug("isstopped");
+
+        };
+    };
+
+    private static void trySleep(int sleepMs) {
+        try {
+            Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
 
     /**
      * list of all sessionids
@@ -80,6 +124,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
 
     public WebSocketManagerSocket() {
         this.myUniqueSessionId = _genSessionId();
+        this.sendingSyncThread = new Thread(this.sendingRunner);
+        this.messageQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
     }
 
     @Override
@@ -112,6 +158,8 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
     @Override
     public void onWebSocketConnect(Session sess) {
         this.session = sess;
+        closed = false;
+        this.sendingSyncThread.start();
         clientList.put(String.valueOf(this.hashCode()), this);
         LOG.debug("client connected from " + this.getRemoteAdr());
     }
@@ -119,13 +167,14 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
     @Override
     public void onWebSocketClose(int statusCode, String reason) {
         clientList.remove(String.valueOf(this.hashCode()));
+        this.sendingSyncThread.interrupt();
+        closed = true;
         LOG.debug("client disconnected from " + this.getRemoteAdr());
     }
 
     @Override
     public void onWebSocketError(Throwable cause) {
         LOG.debug("error caused on " + this.getRemoteAdr() + " :" + cause.getMessage());
-        // super.onWebSocketError(cause);
     }
 
     private String getRemoteAdr() {
@@ -146,12 +195,12 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
     private boolean manageClientRequest(String request) {
         boolean ret = false;
         final Matcher matcher = PATTERN_SCOPEREGISTRATION.matcher(request);
-        if(!matcher.find()) {
+        if (!matcher.find()) {
             return false;
         }
         try {
             ScopeRegistration registration = mapper.readValue(request, ScopeRegistration.class);
-            if (registration!=null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
+            if (registration != null && registration.validate() && registration.isType(MSG_KEY_SCOPES)) {
                 ret = true;
                 String sessionId = this.getSessionId();
                 UserScopes clientDto = new UserScopes();
@@ -188,9 +237,9 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
     public void send(String msg) {
         try {
             LOG.trace("sending {}", msg);
-            this.session.getRemote().sendString(msg);
-        } catch (Exception e) {
-            LOG.warn("problem sending message: " + e.getMessage());
+            this.messageQueue.put(msg);
+        } catch (InterruptedException e) {
+            LOG.warn("problem putting message into sending queue: " + e.getMessage());
         }
     }
 
@@ -200,7 +249,7 @@ public class WebSocketManagerSocket extends WebSocketAdapter {
 
     private void sendToAll(NotificationOutput output) {
         try {
-            this.sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
+            sendToAll(output.getNodeId(), output.getType(), mapper.writeValueAsString(output));
         } catch (JsonProcessingException e) {
             LOG.warn("problem serializing noitifcation: ", e);
         }
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilter.java
deleted file mode 100644 (file)
index 5f3a5af..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : ccsdk features
- * ================================================================================
- * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
- * All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- */
-package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
-
-import java.time.Duration;
-import java.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Problems of to many notifications during mount of thousand of devices:
- * <ul>
- * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
- * <li>Notification processing blocks user -> App design with notifications popups
- * </ul>
- * Rate filter
- * <ul>
- * <li>Do not use a thread -> Do nothing if there are no notifications
- * <li>Parameter1 integrationTime : Measurement or integration time for period
- * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
- * <li>Start measurement on event received that comes later then
- * </ul>
- *
- * <pre>
- *  Example (e: Event received, rateMaxCount=3)
- *         eee                           e  e e e e e  e  e e e e    e         e                e
- *  ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
- *         P1             P2             P1             P2             P3             P7        P1
- *Overload              no             no            yes            yes             no              no
- *
- *
- *Intention to use:
- *   1. Construct with parameters for WS stream to handle
- *   2.
- * </pre>
- */
-
-public class RateFilter {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RateFilter.class.getName());
-
-    private final Duration integrationTime; // Integration time to measure event rate
-    private final long rateMaxCount; //Rate for dropping packets
-    private Instant timeStampPeriodStart; //Time stamp period beginn
-    private Instant timeStampLastEvent; //Measurement interval
-    private long rateCount; // >0: integration running 0: no integration running
-    private boolean overload; //true means in overload status. Change at end of period only.
-    private GetNow get;
-
-    /**
-     * Allow testing with own timestamp provider
-     */
-    public interface GetNow {
-        Instant now();
-    }
-
-    public RateFilter(Duration integrationTime, long rateMaxCount, GetNow getNowMethod) {
-        this.integrationTime = integrationTime;
-        this.rateMaxCount = rateMaxCount;
-        this.get = getNowMethod;
-        this.timeStampLastEvent = Instant.MIN;
-    }
-
-    public RateFilter(Duration integrationTime, long rateMaxCount) {
-        this(integrationTime, rateMaxCount, () -> Instant.now());
-    }
-
-    public synchronized boolean getOverloadStatus() {
-        return overload;
-    }
-
-    /**
-     * Handle filter on event received
-     */
-    public synchronized void filterEvent() {
-        final Instant now = get.now();
-        final Duration durationSinceLastEvent = Duration.between(timeStampLastEvent, now);
-        this.timeStampLastEvent = now;
-
-        if (durationSinceLastEvent.compareTo(integrationTime) >= 0) {
-            //No measurement. Sync and start with period
-            LOG.debug("Sync");
-            timeStampPeriodStart = now;
-            rateCount = 1; //Reset event count .. is part of the
-        } else {
-            //Within period
-            Duration durationPeriod = Duration.between(timeStampPeriodStart, now);
-            rateCount++;
-            boolean endOfPeriod = durationPeriod.compareTo(integrationTime) >= 0;
-            LOG.debug("Period start{}: now:{} end:{} dur:{} int:{}", timeStampPeriodStart, now, endOfPeriod, durationPeriod, integrationTime);
-            if (endOfPeriod) {
-                //Only if end of Period
-                overload = rateCount > rateMaxCount;
-                LOG.debug("Reset overload {}", overload);
-                timeStampPeriodStart = timeStampPeriodStart.plus(integrationTime);
-                rateCount = 0;
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("RateFilter [integrationTime=");
-        builder.append(integrationTime);
-        builder.append(", rateMaxCount=");
-        builder.append(rateMaxCount);
-        builder.append(", timeStampPeriodStart=");
-        builder.append(timeStampPeriodStart);
-        builder.append(", timeStampLastEvent=");
-        builder.append(timeStampLastEvent);
-        builder.append(", rateCount=");
-        builder.append(rateCount);
-        builder.append(", overload=");
-        builder.append(overload);
-        builder.append("]");
-        return builder.toString();
-    }
-}
diff --git a/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java b/sdnr/wt/websocketmanager/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager/utils/RateFilterManager.java
new file mode 100644 (file)
index 0000000..7ffa29e
--- /dev/null
@@ -0,0 +1,323 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Problems of to many notifications during mount of thousand of devices:
+ * <ul>
+ * <li>Overload ODLUX with notification flood -> ODLUX App can not control notifications rate
+ * <li>Notification processing blocks user -> App design with notifications popups
+ * </ul>
+ * Rate filter requirements
+ * <ul>
+ * <li>Use a single thread
+ * <li>Parameter1 integrationTime : Measurement or integration time for period
+ * <li>Parameter2 readMaxCount : Specifies event number per interval indicating overload
+ * <li>Start measurement on event received that comes later then
+ * </ul>
+ *
+ * <pre>
+ *  Example for behavior (e: Event received, rateMaxCount=3)
+ *         eee                           e  e e e e e  e  e e e e    e         e                e
+ *  ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ *         P1             P2             P1             P2             P3             P7        P1
+ *Overload              no             no            yes            yes             no              no
+ * </pre>
+ *
+ * Interface to use:
+ * <ul>
+ * <li>construct RateFilterManager. Parameters are integration time and function to get the actual time
+ * <li>RateFilterManager.getRateFilter() provides rateFilter object for a stream to count events and provide overload
+ * status.
+ * <li>rateFilter.event() count the events during measurement period
+ * <li>rateFilter.getOverloadStatus() indicates status
+ * <li>rateFilter.close() to release this object
+ * </ul>
+ */
+
+public class RateFilterManager implements Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RateFilterManager.class.getName());
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("dd.mm.yy hh:mm:ss_SSS")
+            .withLocale(Locale.GERMAN).withZone(ZoneId.systemDefault());
+    private static final long CLIENTS_NUMBER_WARNLEVEL = 1000;
+
+    //Configuration
+    private final Duration integrationTime; // Integration time to measure event rate
+    private GetNow get; //Provides actual system time
+    private final Map<Long, RateFilter> rateFilterList;
+    @SuppressWarnings("unused")
+    private final Timer timerTask;
+
+    /**
+     * Allow testing with own timestamp provider Provide actual system time.
+     */
+    public interface GetNow {
+        Instant now();
+    }
+
+    /**
+     * Constructor with all parameters, intended to be used for unit test
+     *
+     * @param integrationTime is the interval length for counting events.
+     * @param rateMaxCountDefault if event count exceed this limit, status changes to overload.
+     * @param startTimer true start time with intervall time
+     * @param get function to provide actual system time.
+     */
+    public RateFilterManager(Duration integrationTime, boolean startTimer, GetNow get) {
+        this.integrationTime = integrationTime;
+        this.get = get;
+
+        this.rateFilterList = Collections.synchronizedMap(new HashMap<Long, RateFilter>());
+        this.timerTask = startTimer ? startTimerTask(integrationTime) : null;
+    }
+
+    /**
+     * Get RateFilter manager
+     *
+     * @param integrationTime is the time to measure events
+     * @param rateMaxCountDefault if exceeded state overload is true
+     */
+    public RateFilterManager(Duration integrationTime) {
+        this(integrationTime, true, () -> Instant.now());
+    }
+
+    /**
+     */
+    /**
+     * Get a specific rate filter for one stream. Use close() to release.
+     *
+     * @param ratePerMinute Rate per Minute for this filter. If 0 never overloaded.
+     * @return RateFilter object for each event stream.
+     * @throws IllegalArgumentException on negative rate
+     */
+    public synchronized RateFilter getRateFilter(long maxRatePerMinute) throws IllegalArgumentException {
+        long maxEventsPerIntegration = convertRPMToMaxCount(maxRatePerMinute);
+        if (maxEventsPerIntegration < 0)
+            throw new IllegalArgumentException(
+                    "Resulting in illegal maxEventsPerIntegration=" + maxEventsPerIntegration);
+        return getRateFilterInstance(maxEventsPerIntegration);
+    }
+
+    @Override
+    public void close() {
+        if (timerTask != null) {
+            timerTask.cancel();
+            timerTask.purge();
+        }
+        rateFilterList.clear();
+    }
+
+    /**
+     * Function to get a new Ratefilter for a connection
+     *
+     * @param maxEventsPerIntegration
+     * @return reference to object with filter status
+     */
+    private RateFilter getRateFilterInstance(long maxEventsPerIntegration) {
+        RateFilter rateFilter;
+        synchronized (rateFilterList) {
+            rateFilter = rateFilterList.get(maxEventsPerIntegration);
+            if (rateFilter == null) {
+                rateFilter = new RateFilter(maxEventsPerIntegration);
+                synchronized (rateFilterList) {
+                    rateFilterList.put(maxEventsPerIntegration, rateFilter);
+                }
+            } else {
+                if (rateFilter.addClient() > CLIENTS_NUMBER_WARNLEVEL)
+                    LOG.warn("Warnlevel {} exceeded for client connections", CLIENTS_NUMBER_WARNLEVEL);
+            }
+        }
+        return rateFilter;
+    }
+
+    private Timer startTimerTask(Duration integrationTime) {
+        long milliseconds = integrationTime.toMillis();
+        LOG.debug("Start startTimerTask with {} ms", milliseconds);
+        Timer time = new Timer();
+        time.scheduleAtFixedRate(new TimeoutHandler(), 0L, milliseconds);
+        return time;
+    }
+
+    private class TimeoutHandler extends TimerTask {
+        @Override
+        public void run() {
+            LOG.debug("Run timeout task at {}", f(get.now()));
+            synchronized (rateFilterList) {
+                rateFilterList.forEach((k, f) -> f.timer());
+            }
+        }
+    }
+
+    /**
+     * Provide nice debug output for Instant and Duration
+     *
+     * @param i with instant
+     * @return output string
+     */
+    private static String f(Instant i) {
+        return i != null ? FORMATTER.format(i) : "null";
+    }
+
+    /**
+     * Convert a rate per minute into events per integration time.
+     *
+     * @param ratePerMinute
+     * @return events per integration time.
+     */
+    private long convertRPMToMaxCount(long ratePerMinute) {
+        return ratePerMinute * integrationTime.toSeconds() / TimeUnit.MINUTES.toSeconds(1);
+    }
+
+    /**
+     * Ratefilter class contains status informaton for one event stream.
+     */
+    public class RateFilter implements Closeable {
+        private final long maxEventsPerIntegration; //uuid and maximum of events without overload
+        private Long clients; // Number of clients for this filter.
+        private long rateCount; // number of events during integration period
+        private boolean overload; //true means in overload status. Change at end of period only.
+
+        /**
+         * Create a new Filter
+         *
+         * @param maxEventsPerIntegration >= 1 characteristics and uuid of this filter. < 1 switched off
+         * @see {@link #close}
+         */
+        private RateFilter(long maxEventsPerIntegration) {
+            synchronized (this) {
+                this.clients = 1L;
+                this.maxEventsPerIntegration = maxEventsPerIntegration;
+                this.rateCount = 0;
+            }
+        }
+
+        /**
+         * Add a client to this filter
+         *
+         * @return number of clients, handled by this filter
+         * @see {@link #close}
+         */
+        private synchronized long addClient() {
+            if (clients >= 1) {
+                ++clients;
+            } else {
+                LOG.warn("Misalligned open/close for {} with number {}", maxEventsPerIntegration, clients);
+            }
+            return clients;
+        }
+
+        /**
+         * Provide actual overload status
+         *
+         * @return status true means overloaded false not overloaded
+         */
+        public synchronized boolean getOverloadStatus() {
+            return overload;
+        }
+
+        /**
+         * Handle filter on event received
+         */
+        public synchronized void event() {
+            rateCount++;
+            LOG.debug("event rc:{}", rateCount);
+        }
+
+        /**
+         * Called if measurement period ends. Device if overload and reset counter.
+         */
+        public synchronized void timer() {
+            //Change overload only at end of period
+            //Always inactive if maxEventsPerIntegration== 0
+            if (maxEventsPerIntegration > 0) {
+                overload = rateCount > maxEventsPerIntegration;
+            }
+            rateCount = 0;
+            LOG.debug("Timer ol:{} rc:{}", overload, rateCount);
+        }
+
+        /**
+         * Get maximum events allowed per integration period
+         *
+         * @return 1 ...
+         */
+        public synchronized long getMaxEventsPerIntegration() {
+            return maxEventsPerIntegration;
+        }
+
+        /**
+         * Get number of client streams.
+         *
+         * @return 1 ...
+         */
+        public synchronized long getClients() {
+            return clients;
+        }
+
+        @Override
+        public void close() {
+            synchronized (rateFilterList) {
+                if (clients == 1) {
+                    LOG.debug("Close and remove last client {}", maxEventsPerIntegration);
+                    rateFilterList.remove(this.maxEventsPerIntegration);
+                    clients--;
+                } else if (clients > 1) {
+                    LOG.debug("Close one client of {} for events {}", clients, maxEventsPerIntegration);
+                    clients--;
+                } else {
+                    LOG.warn("Misaligned new/close for events {}", maxEventsPerIntegration);
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("RateFilter [maxEventsPerIntegration=");
+            builder.append(maxEventsPerIntegration);
+            builder.append(", clients=");
+            builder.append(clients);
+            builder.append(", rateCount=");
+            builder.append(rateCount);
+            builder.append(", overload=");
+            builder.append(overload);
+            builder.append("]");
+            return builder.toString();
+        }
+    }
+}
index f3cf095..df04c38 100644 (file)
@@ -20,13 +20,11 @@ package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.nio.file.Paths;
+import java.nio.file.Files;
 import org.junit.Test;
 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.AkkaConfig;
 
@@ -58,19 +56,6 @@ public class AkkaConfigTest {
     public static String loadResourceContentAsString(String resourceName)
             throws URISyntaxException, FileNotFoundException, IOException {
 
-        StringBuilder sb = new StringBuilder();
-
-        ClassLoader classLoader = AkkaConfigTest.class.getClassLoader();
-        File file = Paths.get(classLoader.getResource(resourceName).toURI()).toFile();
-        try (BufferedReader br = new BufferedReader(new FileReader(file))) {
-            String line = br.readLine();
-
-            while (line != null) {
-                sb.append(line);
-                sb.append(System.lineSeparator());
-                line = br.readLine();
-            }
-        }
-        return sb.toString();
+        return Files.readString(new File("src/test/resources/"+resourceName).toPath());
     }
 }
index f4fab68..d5a940f 100644 (file)
  */
 package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Timer;
+import java.util.TimerTask;
 import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilter;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.RateFilterManager.RateFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,11 +38,13 @@ import org.slf4j.LoggerFactory;
  *
  * <pre>
  *  Testcase (e: 17 Event received, rateMaxCount=3)
- *         eee                           e  e e e e e  e  e e e e    e         e                e
- *  ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
- *         P1:1           P2:1           P1:2           P2:2           P3:2          P4:2       P1:3
- *         1000-1002      2000           3500 10 millis
- *Overload              no             no            yes            yes             no              no
+ *         1 3                           4  5 6 7 8 9 10 11    14   15        16               17     18
+ *     t            t           t          t              t              t              t         t              t
+ *           eee                           e  e e e e e  e  e e e e    e         e                e      e
+ *    ---//--|--------------|-----//-------|--------------|--------------|--------------|---//----|--------------|
+ *           P1:1           P2:1           P1:2           P2:2           P3:2          P4:2       P1:3
+ * ms  500   1000-1002      2000           3500           4500           5500           6500      7500           8500
+ *Overload              no             no            yes                      yes     no              no
  * </pre>
  *
  */
@@ -45,37 +52,140 @@ public class RateFilterTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(RateFilterTest.class.getName());
 
-    private static int MILLIS = 1000;
-    private static long[] now = { 1000, 1001, 1002, //P1:1 0-2
-            3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 3-9
-            4510, 4520, 4530, 4540, 4900, //P2:2 4500 10-14
-            5700, //P3:2 5500 15
-            7000, 8000};//P1:3 16-17
+    private static int INTEGRATIONTIMEMILLIS = 1000;
+    private static long EVENTS_PER_INTERVALL = 4;
+    private static long RATE_PER_MINUTE = EVENTS_PER_INTERVALL * 60;
+    /* Negative event time indicates timer event */
+    private static long[] now = {-500, 1000, 1010, 1020, //P1:1 1-3
+            -1500, -2500, -3500, 3500, 3550, 3560, 3570, 3580, 3590, 3800, //P1:2 3500 4-10
+            -4500, 4510, 4520, 4530, 4540, 4900, //P2:2 4500 11-15
+            -5500, 5700, //P3:2 5500 16
+            -6500, -7500, 7500, 8000};//P1:3 17-18
+    private static boolean[] overload = {false, false, false, false, //P1:1 1-3
+            false, false, false, false, false, false, false, false, false, false, //P1:2 3500 4-10
+            true, true, true, true, true, true, //P2:2 4500 11-15
+            true, true, //P3:2 5500 16
+            false, false, false, false};//P1:3 17-18
+
     private static int idx;
+    private static long millis;
 
     @Test
-    public void test() {
-        RateFilter rateFilter = new RateFilter(Duration.ofMillis(MILLIS), 4, () -> getNow());
+    public void testStates() {
+        reset();
+        RateFilterManager rateFilterManager =
+                new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS), false, () -> getNow());
+        RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
         LOG.info("Init done");
+        assertEquals("Events per integration period", EVENTS_PER_INTERVALL, rateFilter.getMaxEventsPerIntegration());
 
-        for (int t=0; t < 20; t++) {
-            LOG.info("{}", t);
-            rateFilter.filterEvent();
-            LOG.info("{}", rateFilter.getOverloadStatus());
+        for (int t = 1; t < 30; t++) {
+            boolean expected = tick();
+            if (millis < 0) {
+                LOG.info("{} - timer {}", t, millis);
+                rateFilter.timer();
+            } else {
+                LOG.info("{} - event {}", t, millis);
+                rateFilter.event();
+            }
+            LOG.info("Overload={} {}", rateFilter.getOverloadStatus(), expected);
+            assertEquals("Filter activity", expected, rateFilter.getOverloadStatus());
         }
+        rateFilter.close();
+    }
+
+    @Test
+    public void testThread() throws InterruptedException {
+        LOG.info("testThread");
+        reset();
+        RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS));
+        RateFilter rateFilter = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+
+        tick();
+        Thread.sleep(2000);
+
+        Object objectYouNeedToLockOn = new Object();
+        Timer timer = new Timer();
+        timer.scheduleAtFixedRate(new TimerTask() {
+            long localMillis;
+
+            @Override
+            public void run() {
+                long xLocalMillis = localMillis += 10;
+                long xMillis = Math.abs(millis);
+                if (xLocalMillis >= xMillis) {
+                    LOG.info("aTime:{} Millis:{} Idx={}", xLocalMillis, xMillis, idx);
+                    boolean expected = tick();
+                    if (millis > 0) {
+                        //Skip negatives .. handled by timer
+                        rateFilter.event();
+                        boolean actual = rateFilter.getOverloadStatus();
+                        LOG.info("bTime:{} Millis:{} Idx={} Overload={} Expected={} {}", xLocalMillis, xMillis, idx,
+                                actual, expected, actual == expected ? "" : "XXXX");
+                        if (idx >= 30) {
+                            LOG.info("Test is ending");
+                            synchronized (objectYouNeedToLockOn) {
+                                objectYouNeedToLockOn.notify();
+                            }
+                            timer.cancel();
+                        }
+                        assertEquals("Filter activity", expected, rateFilter.getOverloadStatus());
+                    }
+                }
+            }
+        }, 0, 10);
+        synchronized (objectYouNeedToLockOn) {
+            objectYouNeedToLockOn.wait();
+        }
+        //rateFilter.close();
+        LOG.info("Test end");
+    }
+
+    @Test
+    public void testMultipleClients() {
+        RateFilterManager rateFilterManager = new RateFilterManager(Duration.ofMillis(INTEGRATIONTIMEMILLIS));
+        RateFilter rateFilter1 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+        assertEquals("Multiple clients", 1, rateFilter1.getClients());
+        RateFilter rateFilter2 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+        assertEquals("Multiple clients", 2, rateFilter1.getClients());
+        RateFilter rateFilter3 = rateFilterManager.getRateFilter(RATE_PER_MINUTE);
+        assertEquals("Multiple clients", 3, rateFilter1.getClients());
+
+        assertEquals("Similar instances", rateFilter1, rateFilter3);
+
+        RateFilter rateFilterOther = rateFilterManager.getRateFilter(2*RATE_PER_MINUTE);
+        assertNotEquals("Different instances", rateFilter1, rateFilterOther);
+        rateFilterOther.close();
+
+        rateFilter3.close();
+        assertEquals("Multiple clients", 2, rateFilter1.getClients());
+        rateFilter2.close();
+        assertEquals("Multiple clients", 1, rateFilter1.getClients());
+        rateFilter1.close();
+        assertEquals("Multiple clients", 0, rateFilter1.getClients());
+
+        rateFilterManager.close();
+    }
+
+    private Instant getNow() {
+        LOG.debug("Now:{}", millis);
+        return Instant.ofEpochMilli(Math.abs(millis));
+    }
 
+    private void reset() {
+        idx = 0;
     }
 
-    Instant getNow() {
-        long res;
+    private boolean tick() {
         if (idx < now.length) {
-            res = now[idx];
+            millis = now[idx];
         } else {
             int lastIdx = now.length - 1;
-            res = now[lastIdx] + (idx - lastIdx) * MILLIS;
+            millis = now[lastIdx] + (idx - lastIdx) * INTEGRATIONTIMEMILLIS;
         }
+        boolean expected = idx < overload.length ? overload[idx] : false;
         idx++;
-        return Instant.ofEpochMilli(res);
+        return expected;
     }
 
 }
diff --git a/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java b/sdnr/wt/websocketmanager/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/websocketmanager2/test/UserScopeTest.java
new file mode 100644 (file)
index 0000000..885ded3
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.ReducedSchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.SchemaInfo;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.model.data.Scope;
+import org.onap.ccsdk.features.sdnr.wt.websocketmanager.utils.UserScopes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public class UserScopeTest {
+
+
+    @Test
+    public void testAllNodes() {
+        UserScopes scopes1 = new UserScopes();
+        scopes1.setScopes(Arrays.asList(buildScope(null, ProblemNotification.QNAME)));
+
+        assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME)));
+        assertFalse(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME)));
+
+        assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME)));
+
+    }
+
+    @Test
+    public void testRevisionStar() {
+        UserScopes scopes1 = new UserScopes();
+        scopes1.setScopes(
+                Arrays.asList(buildScope(null, ProblemNotification.QNAME.getNamespace().toString(), "*", null)));
+
+        assertTrue(scopes1.hasScope(new ReducedSchemaInfo(ProblemNotification.QNAME)));
+        assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ObjectCreationNotification.QNAME)));
+
+        assertTrue(scopes1.hasScope("RoadmA", new ReducedSchemaInfo(ProblemNotification.QNAME)));
+
+    }
+
+    private static final Scope buildScope(String nodeId, String namespace, String revision,
+            List<String> notifications) {
+        Scope scope = new Scope();
+        scope.setNodeId(nodeId);
+        scope.setSchema(new SchemaInfo(namespace, revision, notifications));
+        return scope;
+    }
+
+    private static final Scope buildScope(String nodeId, QName qname) {
+        Scope scope = new Scope();
+        scope.setNodeId(nodeId);
+        scope.setSchema(new SchemaInfo(qname));
+        return scope;
+    }
+
+}
index bc3cd10..2e64624 100644 (file)
  */
 package org.onap.ccsdk.features.sdnr.wt.websocketmanager2.test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.onap.ccsdk.features.sdnr.wt.websocketmanager.WebSocketManagerProvider;
 import org.opendaylight.mdsal.binding.api.RpcProviderService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ObjectCreationNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.devicemanager.rev190109.ProblemNotificationBuilder;
+import org.opendaylight.yangtools.yang.binding.Notification;
 import org.osgi.service.http.HttpService;
 
 public class WebsockerProviderTest extends Mockito {
@@ -42,4 +48,14 @@ public class WebsockerProviderTest extends Mockito {
 
     }
 
+    @Test
+    public void testTypeAssertion() {
+
+        Notification problemNotification = new ProblemNotificationBuilder().build();
+        assertTrue(WebSocketManagerProvider.assertNotificationType(problemNotification, ProblemNotification.QNAME));
+        assertFalse(
+                WebSocketManagerProvider.assertNotificationType(problemNotification, ObjectCreationNotification.QNAME));
+
+    }
+
 }