--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class DCAENotificationCallback implements NotificationCallback {
+
+ @Override
+ public void activateCallBack(String msg) {
+ NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+ //Todo analyze the event and Report to the Intent Flow;
+ }
+}
--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dcae.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.onap.usecaseui.intentanalysis.adapters.policy.dmaap.PolicyNotificationCallback;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DCAENotificationService {
+
+ //config of policy dmaap event subscribe
+ private static final String MONITOR_CONFIG_FILE = "dcae_dmaap_config.json";
+
+ public DCAENotificationService(){
+ init();
+ }
+
+ private void init(){
+ MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new DCAENotificationCallback());
+ monitor.start();
+ }
+}
package org.onap.usecaseui.intentanalysis.adapters.dmaap;
-import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
*/
public class MRTopicMonitor implements Runnable {
- private final String name;
+ private final String configFileName;
+
private volatile boolean running = false;
+
private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+
private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+
private MRConsumerWrapper consumerWrapper;
+
private NotificationCallback callback;
/**
* Constructor
- * @param name name of topic subscriber in config
+ *
+ * @param configFileName name of topic subscriber file
* @param callback callbackfunction for received message
*/
- public MRTopicMonitor(String name, NotificationCallback callback){
- this.name = name;
+ public MRTopicMonitor(String configFileName, NotificationCallback callback) {
+ this.configFileName = configFileName;
this.callback = callback;
}
/**
* Start the monitoring thread
*/
- public void start(){
+ public void start() {
logger.info("Starting Dmaap Bus Monitor");
try {
- File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json");
+ File configFile = Resources.getResourceAsFile("dmaapConfig/" + configFileName);
String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8);
JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject();
consumerWrapper = buildConsumerWrapper(jsonObject);
* Main loop that keep fetching and processing
*/
@Override
- public void run(){
- while (running){
+ public void run() {
+ while (running) {
try {
- logger.debug("Topic: {} getting new msg...", name);
+ logger.debug("Topic: {} getting new msg...", consumerWrapper.getTopicName());
List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
- for (JsonElement msg : dmaapMsgs){
- logger.debug("Received message: {}" +
- "\r\n and processing start", msg);
+ for (JsonElement msg : dmaapMsgs) {
+ logger.debug("Received message: {}" + "\r\n and processing start", msg);
process(msg.toString());
}
- } catch (IOException | RuntimeException e){
+ } catch (IOException | RuntimeException e) {
logger.error("fetchMessage encountered error: {}", e);
}
}
/**
* Stop the monitor
*/
- public void stop(){
+ public void stop() {
logger.info("{}: exiting", this);
running = false;
this.consumerWrapper.close();
private void process(String msg) {
try {
callback.activateCallBack(msg);
- } catch (Exception e){
+ } catch (Exception e) {
logger.error("process message encountered error: {}", e);
}
}
return this.consumerWrapper.fetch();
}
- private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
- throws IllegalArgumentException {
+ private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson)
+ throws IllegalArgumentException {
MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
return new MRConsumerWrapper(topicParams);
}
* Name of the "protocol" property.
*/
protected static final String PROTOCOL_PROP = "Protocol";
+
/**
* Fetch timeout.
*/
@Getter
private final int sleepTime;
+ /**
+ * Topic Name to Subscribe
+ */
+ @Getter
+ private String topicName;
+
/**
* Counted down when {@link #close()} is invoked.
*/
private final CountDownLatch closeCondition = new CountDownLatch(1);
protected MessageRouterSubscriber subscriber;
+
protected MessageRouterSubscribeRequest request;
/**
* @param MRTopicParams parameters for the bus topic
*/
protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+ this.topicName = MRTopicParams.getTopicName();
this.fetchTimeout = MRTopicParams.getFetchTimeout();
if (this.fetchTimeout <= 0) {
throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
}
- try{
+ try {
this.subscriber = DmaapUtil.buildSubscriber();
- this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+ this.request = DmaapUtil.buildSubscriberRequest(topicName + "-Subscriber", MRTopicParams.getTopic(),
+ MRTopicParams.getConsumerGroup(), MRTopicParams.getConsumerInstance());
} catch (Exception e) {
throw new IllegalArgumentException("Illegal MrConsumer parameters");
/**
* Try fetch new message. But backoff for some sleepTime when connection fails.
+ *
* @return
* @throws IOException
*/
return additionalProps != null;
}
+ public String getTopicName(){
+ if(null == this.topic){
+ return "";
+ }
+ String[] pmTopicSplit = this.topic.split("\\/");
+ return pmTopicSplit[pmTopicSplit.length - 1];
+ }
+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public static class TopicParamsBuilder {
if (topicUrl.startsWith("https")){
useHttps = true;
}
- String[] pmTopicSplit = topicUrl.split("\\/");
- topic = pmTopicSplit[pmTopicSplit.length - 1];
this.params.topic = topicUrl;
this.params.servers = servers;
this.params.serializationProvider = serializationProvider;
return this;
}
-
}
}
--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+public class NotificationEventEntity {
+ //The entity id , currently in CLL User Case, it is CLL id
+ private String id;
+
+ //Assurance or modifyBW
+ private String operation;
+
+ // it can be Failed/Success
+ private String result;
+
+ private String reason;
+
+}
--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import java.util.Date;
+
+public class NotificationEventModel {
+
+ private String source;
+
+ private Date timestamp;
+
+ private NotificationEventEntity entity;
+}
--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import com.google.gson.Gson;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationCallback;
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.NotificationEventModel;
+
+public class PolicyNotificationCallback implements NotificationCallback {
+ @Override
+ public void activateCallBack(String msg) {
+ NotificationEventModel event = (new Gson()).fromJson(msg, NotificationEventModel.class);
+
+ //Todo analyze the event and Report to the Intent Flow;
+ }
+}
--- /dev/null
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.policy.dmaap;
+
+import org.onap.usecaseui.intentanalysis.adapters.dmaap.MRTopicMonitor;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PolicyNotificationService {
+
+ //config of policy dmaap event subscribe
+ private static final String MONITOR_CONFIG_FILE = "policy_dmaap_config.json";
+
+ public PolicyNotificationService(){
+ init();
+ }
+
+ private void init(){
+ MRTopicMonitor monitor = new MRTopicMonitor(MONITOR_CONFIG_FILE, new PolicyNotificationCallback());
+ monitor.start();
+ }
+}
return cut;
}
- public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+ public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl, String consumerGroup, String consumerId){
MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
.name(name)
.topicUrl(topicUrl)
.build();
MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
- .consumerGroup("1")
- .consumerId("1")
+ .consumerGroup(consumerGroup)
+ .consumerId(consumerId)
.sourceDefinition(sourceDefinition)
.build();
{
- "dcae_subscriber":{
- "type":"message_router",
- "aaf_username": null,
- "aaf_password": null,
- "api_key" : null,
- "api_secret" : null,
- "servers" : ["message-router:3904"],
- "consumer_group" : "intent_analysis_dcaeevent",
- "consumer_instance" : "intent_analysis_dcaeevent_1",
- "fetch_timeout" : 15000,
- "fetch_limit" : 100,
- "dmaap_info":{
- "topic_url":"http://message-router:3904/events/INTENT-EVENT",
- "client_role":"org.onap.uui.intentanalysisSub",
- "location":"onap",
- "client_id":"intent-analysis-1"
- }
- }
+ "type":"message_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "api_key" : null,
+ "api_secret" : null,
+ "servers" : ["message-router:3904"],
+ "consumer_group" : "intent_DCAE_event",
+ "consumer_instance" : "intent_DCAE_event_1",
+ "fetch_timeout" : 15000,
+ "fetch_limit" : 100,
+ "dmaap_info":{
+ "topic_url":"http://message-router:3904/events/CCVPN-CL-DCAE-EVENT",
+ "client_role":"org.onap.uui.intentanalysisSub",
+ "location":"onap",
+ "client_id":"intent-analysis-1"
+ }
}
{
- "policy_subscriber":{
- "type":"message_router",
- "aaf_username": null,
- "aaf_password": null,
- "api_key" : null,
- "api_secret" : null,
- "servers" : ["message-router:3904"],
- "consumer_group" : "intent_analysis_policyevent",
- "consumer_instance" : "intent_analysis_policyevent_1",
- "fetch_timeout" : 15000,
- "fetch_limit" : 100,
- "dmaap_info":{
- "topic_url":"http://message-router:3904/events/INTENT-EVENT",
- "client_role":"org.onap.uui.intentanalysisSub",
- "location":"onap",
- "client_id":"intent-analysis-1"
- }
- }
+ "type":"message_router",
+ "aaf_username": null,
+ "aaf_password": null,
+ "api_key" : null,
+ "api_secret" : null,
+ "servers" : ["message-router:3904"],
+ "consumer_group" : "intent_analysis_policyevent",
+ "consumer_instance" : "intent_analysis_policyevent_1",
+ "fetch_timeout" : 15000,
+ "fetch_limit" : 100,
+ "dmaap_info": {
+ "topic_url":"http://message-router:3904/events/CCVPN-CL-POLICY-EVENT",
+ "client_role":"org.onap.uui.intentanalysisSub",
+ "location":"onap",
+ "client_id":"intent-analysis-1"
+ }
}