Add DCAE and Policy Dmaap Listener Service 50/134050/1
authorChuanyuChen <chenchuanyu@huawei.com>
Mon, 10 Apr 2023 08:57:20 +0000 (16:57 +0800)
committerChuanyuChen <chenchuanyu@huawei.com>
Mon, 10 Apr 2023 08:58:27 +0000 (16:58 +0800)
Add DCAE and Policy Dmaap Listener Service
Issue-ID: USECASEUI-794

Signed-off-by: ChuanyuChen <chenchuanyu@huawei.com>
Change-Id: I1af8093c198fbbc06942441f2ef8f06fe66eec03

intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json
intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json

diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationCallback.java
new file mode 100644 (file)
index 0000000..158cf91
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class DCAENotificationCallback implements NotificationCallback {
+
+    @Override
+    public void activateCallBack(String msg) {
+        NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+        //Todo analyze the event and Report to the Intent Flow;
+    }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dcae/dmaap/DCAENotificationService.java
new file mode 100644 (file)
index 0000000..9ae8483
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.onap.usecaseui.intentanalysis.adapters.policy.dmaap.PolicyNotificationCallback;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DCAENotificationService {
+
+    //config of policy dmaap event subscribe
+    private static final String MONITOR_CONFIG_FILE = "dcae_dmaap_config.json";
+
+    public DCAENotificationService(){
+        init();
+    }
+
+    private void init(){
+        MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new DCAENotificationCallback());
+        monitor.start();
+    }
+}
index 4712d0c..528805d 100644 (file)
@@ -21,7 +21,6 @@
 
 package org.onap.usecaseui.intentanalysis.adapters.dmaap;
 
-import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
@@ -29,7 +28,6 @@ import io.vavr.collection.List;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -53,30 +51,36 @@ import reactor.core.publisher.Mono;
  */
 public class MRTopicMonitor implements Runnable {
 
-    private final String name;
+    private final String configFileName;
+
     private volatile boolean running = false;
+
     private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+
     private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+
     private MRConsumerWrapper consumerWrapper;
+
     private NotificationCallback callback;
 
     /**
      * Constructor
-     * @param name name of topic subscriber in config
+     *
+     * @param configFileName name of topic subscriber file
      * @param callback callbackfunction for received message
      */
-    public MRTopicMonitor(String name, NotificationCallback callback){
-        this.name = name;
+    public MRTopicMonitor(String configFileName, NotificationCallback callback) {
+        this.configFileName = configFileName;
         this.callback = callback;
     }
 
     /**
      * Start the monitoring thread
      */
-    public void start(){
+    public void start() {
         logger.info("Starting Dmaap Bus Monitor");
         try {
-            File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json");
+            File configFile = Resources.getResourceAsFile("dmaapConfig/" + configFileName);
             String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8);
             JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject();
             consumerWrapper = buildConsumerWrapper(jsonObject);
@@ -93,17 +97,16 @@ public class MRTopicMonitor implements Runnable {
      * Main loop that keep fetching and processing
      */
     @Override
-    public void run(){
-        while (running){
+    public void run() {
+        while (running) {
             try {
-                logger.debug("Topic: {} getting new msg...", name);
+                logger.debug("Topic: {} getting new msg...", consumerWrapper.getTopicName());
                 List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
-                for (JsonElement msg : dmaapMsgs){
-                    logger.debug("Received message: {}" +
-                            "\r\n and processing start", msg);
+                for (JsonElement msg : dmaapMsgs) {
+                    logger.debug("Received message: {}" + "\r\n and processing start", msg);
                     process(msg.toString());
                 }
-            } catch (IOException | RuntimeException e){
+            } catch (IOException | RuntimeException e) {
                 logger.error("fetchMessage encountered error: {}", e);
             }
         }
@@ -113,7 +116,7 @@ public class MRTopicMonitor implements Runnable {
     /**
      * Stop the monitor
      */
-    public void stop(){
+    public void stop() {
         logger.info("{}: exiting", this);
         running = false;
         this.consumerWrapper.close();
@@ -123,7 +126,7 @@ public class MRTopicMonitor implements Runnable {
     private void process(String msg) {
         try {
             callback.activateCallBack(msg);
-        } catch (Exception e){
+        } catch (Exception e) {
             logger.error("process message encountered error: {}", e);
         }
     }
@@ -132,8 +135,8 @@ public class MRTopicMonitor implements Runnable {
         return this.consumerWrapper.fetch();
     }
 
-    private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
-            throws IllegalArgumentException {
+    private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson)
+        throws IllegalArgumentException {
         MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
         return new MRConsumerWrapper(topicParams);
     }
@@ -148,6 +151,7 @@ public class MRTopicMonitor implements Runnable {
          * Name of the "protocol" property.
          */
         protected static final String PROTOCOL_PROP = "Protocol";
+
         /**
          * Fetch timeout.
          */
@@ -159,12 +163,19 @@ public class MRTopicMonitor implements Runnable {
         @Getter
         private final int sleepTime;
 
+        /**
+         * Topic Name to Subscribe
+         */
+        @Getter
+        private String topicName;
+
         /**
          * Counted down when {@link #close()} is invoked.
          */
         private final CountDownLatch closeCondition = new CountDownLatch(1);
 
         protected MessageRouterSubscriber subscriber;
+
         protected MessageRouterSubscribeRequest request;
 
         /**
@@ -173,6 +184,7 @@ public class MRTopicMonitor implements Runnable {
          * @param MRTopicParams parameters for the bus topic
          */
         protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+            this.topicName = MRTopicParams.getTopicName();
             this.fetchTimeout = MRTopicParams.getFetchTimeout();
 
             if (this.fetchTimeout <= 0) {
@@ -190,9 +202,10 @@ public class MRTopicMonitor implements Runnable {
                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
             }
 
-            try{
+            try {
                 this.subscriber = DmaapUtil.buildSubscriber();
-                this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+                this.request = DmaapUtil.buildSubscriberRequest(topicName + "-Subscriber", MRTopicParams.getTopic(),
+                    MRTopicParams.getConsumerGroup(), MRTopicParams.getConsumerInstance());
 
             } catch (Exception e) {
                 throw new IllegalArgumentException("Illegal MrConsumer parameters");
@@ -202,6 +215,7 @@ public class MRTopicMonitor implements Runnable {
 
         /**
          * Try fetch new message. But backoff for some sleepTime when connection fails.
+         *
          * @return
          * @throws IOException
          */
index 7e2b3b6..6dcb932 100644 (file)
@@ -176,6 +176,14 @@ public class MRTopicParams {
         return additionalProps != null;
     }
 
+    public String getTopicName(){
+        if(null == this.topic){
+            return "";
+        }
+        String[] pmTopicSplit = this.topic.split("\\/");
+        return pmTopicSplit[pmTopicSplit.length - 1];
+    }
+
     @NoArgsConstructor(access = AccessLevel.PRIVATE)
     public static class TopicParamsBuilder {
 
@@ -326,8 +334,6 @@ public class MRTopicParams {
             if (topicUrl.startsWith("https")){
                 useHttps = true;
             }
-            String[] pmTopicSplit = topicUrl.split("\\/");
-            topic = pmTopicSplit[pmTopicSplit.length - 1];
 
             this.params.topic = topicUrl;
             this.params.servers = servers;
@@ -370,6 +376,5 @@ public class MRTopicParams {
             this.params.serializationProvider = serializationProvider;
             return this;
         }
-
     }
 }
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventEntity.java
new file mode 100644 (file)
index 0000000..3dce969
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+public class NotificationEventEntity {
+    //The entity id , currently in CLL User Case, it is CLL id
+    private String id;
+
+    //Assurance or modifyBW
+    private String operation;
+
+    // it can be Failed/Success
+    private String result;
+
+    private String reason;
+
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationEventModel.java
new file mode 100644 (file)
index 0000000..2b4c44f
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import java.util.Date;
+
+public class NotificationEventModel {
+
+    private String source;
+
+    private Date timestamp;
+
+    private NotificationEventEntity entity;
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationCallback.java
new file mode 100644 (file)
index 0000000..a93aac9
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class PolicyNotificationCallback implements NotificationCallback {
+    @Override
+    public void activateCallBack(String msg) {
+        NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+        //Todo analyze the event and Report to the Intent Flow;
+    }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/policy/dmaap/PolicyNotificationService.java
new file mode 100644 (file)
index 0000000..4ad32bd
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PolicyNotificationService {
+
+    //config of policy dmaap event subscribe
+    private static final String MONITOR_CONFIG_FILE = "policy_dmaap_config.json";
+
+    public PolicyNotificationService(){
+        init();
+    }
+
+    private void init(){
+        MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new PolicyNotificationCallback());
+        monitor.start();
+    }
+}
index c6cbfce..a05c1be 100644 (file)
@@ -46,14 +46,14 @@ public class DmaapUtil {
         return cut;
     }
 
-    public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+    public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl, String consumerGroup, String consumerId){
         MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
             .name(name)
             .topicUrl(topicUrl)
             .build();
         MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
-            .consumerGroup("1")
-            .consumerId("1")
+            .consumerGroup(consumerGroup)
+            .consumerId(consumerId)
             .sourceDefinition(sourceDefinition)
             .build();
 
index d22e843..892d77e 100644 (file)
@@ -1,20 +1,18 @@
 {
-    "dcae_subscriber":{
-        "type":"message_router",
-        "aaf_username": null,
-        "aaf_password": null,
-        "api_key" : null,
-        "api_secret" : null,
-        "servers" : ["message-router:3904"],
-        "consumer_group" : "intent_analysis_dcaeevent",
-        "consumer_instance" : "intent_analysis_dcaeevent_1",
-        "fetch_timeout" : 15000,
-        "fetch_limit" : 100,
-        "dmaap_info":{
-          "topic_url":"http://message-router:3904/events/INTENT-EVENT",
-          "client_role":"org.onap.uui.intentanalysisSub",
-          "location":"onap",
-          "client_id":"intent-analysis-1"
-        }
-      }
+    "type":"message_router",
+    "aaf_username": null,
+    "aaf_password": null,
+    "api_key" : null,
+    "api_secret" : null,
+    "servers" : ["message-router:3904"],
+    "consumer_group" : "intent_DCAE_event",
+    "consumer_instance" : "intent_DCAE_event_1",
+    "fetch_timeout" : 15000,
+    "fetch_limit" : 100,
+    "dmaap_info":{
+      "topic_url":"http://message-router:3904/events/CCVPN-CL-DCAE-EVENT",
+      "client_role":"org.onap.uui.intentanalysisSub",
+      "location":"onap",
+      "client_id":"intent-analysis-1"
+    }
 }
index 0ec440f..e4d06b1 100644 (file)
@@ -1,20 +1,18 @@
 {
-    "policy_subscriber":{
-        "type":"message_router",
-        "aaf_username": null,
-        "aaf_password": null,
-        "api_key" : null,
-        "api_secret" : null,
-        "servers" : ["message-router:3904"],
-        "consumer_group" : "intent_analysis_policyevent",
-        "consumer_instance" : "intent_analysis_policyevent_1",
-        "fetch_timeout" : 15000,
-        "fetch_limit" : 100,
-        "dmaap_info":{
-          "topic_url":"http://message-router:3904/events/INTENT-EVENT",
-          "client_role":"org.onap.uui.intentanalysisSub",
-          "location":"onap",
-          "client_id":"intent-analysis-1"
-        }
-      }
+    "type":"message_router",
+    "aaf_username": null,
+    "aaf_password": null,
+    "api_key" : null,
+    "api_secret" : null,
+    "servers" : ["message-router:3904"],
+    "consumer_group" : "intent_analysis_policyevent",
+    "consumer_instance" : "intent_analysis_policyevent_1",
+    "fetch_timeout" : 15000,
+    "fetch_limit" : 100,
+    "dmaap_info": {
+      "topic_url":"http://message-router:3904/events/CCVPN-CL-POLICY-EVENT",
+      "client_role":"org.onap.uui.intentanalysisSub",
+      "location":"onap",
+      "client_id":"intent-analysis-1"
+    }
 }