Integrate sdc notification 78/82578/1
authorromaingimbert <romain.gimbert@orange.com>
Mon, 18 Mar 2019 10:18:47 +0000 (11:18 +0100)
committerromaingimbert <romain.gimbert@orange.com>
Mon, 18 Mar 2019 10:18:47 +0000 (11:18 +0100)
-refactor code
-beggining implementation

Change-Id: I9f66d64d9a375ef96e0248f85e2c06828f1063eb
Issue-ID: EXTAPI-158
Signed-off-by: romaingimbert <romain.gimbert@orange.com>
src/main/java/org/onap/nbi/apis/hub/HubResource.java
src/main/java/org/onap/nbi/apis/hub/model/EventType.java
src/main/java/org/onap/nbi/apis/hub/service/CheckDMaaPEventsManager.java [deleted file]
src/main/java/org/onap/nbi/apis/hub/service/DMaaPEventsScheduler.java [deleted file]
src/main/java/org/onap/nbi/apis/hub/service/EventFactory.java
src/main/java/org/onap/nbi/apis/hub/service/dmaap/CheckDMaaPEventsManager.java [new file with mode: 0644]
src/main/java/org/onap/nbi/apis/hub/service/dmaap/DMaaPEventsScheduler.java [new file with mode: 0644]
src/main/java/org/onap/nbi/apis/status/OnapClient.java
src/main/resources/application-test.properties
src/main/resources/application.properties

index ac073c1..016ebca 100755 (executable)
@@ -18,7 +18,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import org.onap.nbi.apis.hub.model.Subscriber;
 import org.onap.nbi.apis.hub.model.Subscription;
-import org.onap.nbi.apis.hub.service.CheckDMaaPEventsManager;
+import org.onap.nbi.apis.hub.service.dmaap.CheckDMaaPEventsManager;
 import org.onap.nbi.apis.hub.service.SubscriptionService;
 import org.onap.nbi.commons.JsonRepresentation;
 import org.onap.nbi.commons.MultiCriteriaRequestBuilder;
index b4e1f1a..5db21f3 100644 (file)
@@ -28,7 +28,9 @@ public enum EventType {
 
   SERVICE_ATTRIBUTE_VALUE_CHANGE("ServiceAttributeValueChangeNotification"),
 
-  SERVICE_REMOVE("ServiceRemoveNotification");
+  SERVICE_REMOVE("ServiceRemoveNotification"),
+
+  SDC_DISTRIBUTION("SdcDistributionNotification");
 
   private String value;
 
diff --git a/src/main/java/org/onap/nbi/apis/hub/service/CheckDMaaPEventsManager.java b/src/main/java/org/onap/nbi/apis/hub/service/CheckDMaaPEventsManager.java
deleted file mode 100644 (file)
index b45647f..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Copyright (c) 2019 Huawei
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.onap.nbi.apis.hub.service;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import javax.annotation.PostConstruct;
-import org.onap.nbi.OnapComponentsUrlPaths;
-import org.onap.nbi.apis.hub.model.Event;
-import org.onap.nbi.apis.hub.model.EventType;
-import org.onap.nbi.apis.hub.model.ServiceInstanceEvent;
-import org.onap.nbi.apis.hub.repository.SubscriberRepository;
-import org.onap.nbi.apis.serviceorder.model.RelatedParty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-import org.springframework.web.client.RestTemplate;
-import org.springframework.web.util.UriComponentsBuilder;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-
-@Service
-public class CheckDMaaPEventsManager {
-
-  public static final String RESPONSE_STATUS = "response status : ";
-  public static final String RETURNS = " returns ";
-  public static final String ERROR_ON_CALLING = "error on calling ";
-
-  @Autowired
-  private RestTemplate restTemplate;
-
-  @Autowired
-  private SubscriberRepository subscriberRepository;
-
-  @Autowired
-  private NotifierService notifier;
-
-  @Value("${dmaap.host}")
-  private String dmaapHostname;
-
-  @Value("${dmaap.topic}")
-  private String topic;
-
-  @Value("${dmaap.consumergroup}")
-  private String consumerGroup;
-
-  @Value("${dmaap.consumerid}")
-  private String consumerId;
-
-  @Value("${dmaap.timeout}")
-  private String timeout;
-
-  private final Logger logger = LoggerFactory.getLogger(CheckDMaaPEventsManager.class);
-
-  private String dmaapGetEventsUrl;
-
-  @PostConstruct
-  private void setUpAndLogDMaaPUrl() {
-    dmaapGetEventsUrl = new StringBuilder().append(dmaapHostname)
-        .append(OnapComponentsUrlPaths.DMAAP_CONSUME_EVENTS).toString();
-    logger.info("DMaaP Get Events url :  " + dmaapGetEventsUrl);
-  }
-
-  public void checkForDMaaPAAIEvents() {
-    ObjectMapper mapper = new ObjectMapper();
-
-
-
-    List<String> dmaapResponse = callDMaaPGetEvents();
-    if (!CollectionUtils.isEmpty(dmaapResponse)) {
-      for (int i = 0; i < dmaapResponse.size(); i++) {
-        String aaiEventString = dmaapResponse.get(i);
-        if (logger.isDebugEnabled()) {
-          logger.debug("aai event returned was {}", aaiEventString);
-        }
-        try {
-          JsonNode jsonNode = mapper.readValue(aaiEventString, JsonNode.class);
-          JsonNode eventHeader = jsonNode.get("event-header");
-          String aaiEventEntityType = eventHeader.get("entity-type").asText();
-          String action = eventHeader.get("action").asText();
-          if (logger.isDebugEnabled()) {
-            logger.debug("aaiEventEntityType is {} and action is {}", aaiEventEntityType, action);
-          }
-          if (aaiEventEntityType.equals("service-instance")) {
-            {
-              // parse the AAI-EVENT service-instance tree
-              ServiceInstanceEvent serviceInstanceEvent = new ServiceInstanceEvent();
-              RelatedParty relatedParty = new RelatedParty();
-              JsonNode entity = jsonNode.get("entity");
-              relatedParty.setId(entity.get("global-customer-id").asText());
-              relatedParty.setName(entity.get("subscriber-name").asText());
-              serviceInstanceEvent.setRelatedParty(relatedParty);
-              JsonNode childServiceSubscription = entity.get("service-subscriptions");
-              JsonNode serviceSubscriptions = childServiceSubscription.get("service-subscription");
-              JsonNode serviceSubscription = serviceSubscriptions.get(0);
-              String serviceSubscriptionPrint = serviceSubscription.toString();
-              JsonNode childserviceInstances = serviceSubscription.get("service-instances");
-              JsonNode serviceInstances = childserviceInstances.get("service-instance");
-              JsonNode serviceInstance = serviceInstances.get(0);
-              serviceInstanceEvent.setId(serviceInstance.get("service-instance-id").asText());
-              serviceInstanceEvent.setHref("service/" + serviceInstance.get("service-instance-id").asText());
-              serviceInstanceEvent.setState(serviceInstance.get("orchestration-status").asText());
-              if (action.equals("CREATE")) {
-                if (logger.isDebugEnabled()) {
-                  logger.debug("sending service inventory event to listeners");
-                }
-                processEvent(
-                    EventFactory.getEvent(EventType.SERVICE_CREATION, serviceInstanceEvent));
-              } else if (action.equals("DELETE")) {
-                processEvent(EventFactory.getEvent(EventType.SERVICE_REMOVE, serviceInstanceEvent));
-              } else if (action.equals("UPDATE")) {
-                processEvent(EventFactory.getEvent(EventType.SERVICE_ATTRIBUTE_VALUE_CHANGE,
-                    serviceInstanceEvent));
-              }
-
-
-            }
-
-          }
-
-        } catch (JsonParseException e) {
-          logger.error(" unable to Parse AAI Event JSON String {}, exception is", aaiEventString,
-              e.getMessage());
-        } catch (JsonMappingException e) {
-          logger.error(" unable to Map AAI Event JSON String {} to Java Pojo, exception is",
-              aaiEventString, e.getMessage());
-        } catch (IOException e) {
-          logger.error("IO Error when parsing AAI Event JSON String {} ", aaiEventString,
-              e.getMessage());
-        }
-      }
-    }
-  }
-
-  public List<String> callDMaaPGetEvents() {
-
-    String dmaapGetEventsUrlFormated = dmaapGetEventsUrl.replace("$topic", topic);
-    dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumergroup", consumerGroup);
-    dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumerid", consumerId);
-    dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$timeout", timeout);
-
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Calling DMaaP Url : " + dmaapGetEventsUrlFormated);
-    }
-    UriComponentsBuilder callURI = UriComponentsBuilder.fromHttpUrl(dmaapGetEventsUrlFormated);
-    ResponseEntity<Object> response = callDMaaP(callURI.build().encode().toUri());
-    return (List<String>) response.getBody();
-
-  }
-
-  private ResponseEntity<Object> callDMaaP(URI callURI) {
-    ResponseEntity<Object> response =
-        restTemplate.exchange(callURI, HttpMethod.GET, buildRequestHeader(), Object.class);
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("response body : {} ", response.getBody().toString());
-      logger.debug("response status : {}", response.getStatusCodeValue());
-    }
-    return response;
-  }
-
-  private HttpEntity<String> buildRequestHeader() {
-    HttpHeaders httpHeaders = new HttpHeaders();
-    httpHeaders.add("Accept", "application/json");
-    httpHeaders.add("Content-Type", "application/json");
-    return new HttpEntity<>("parameters", httpHeaders);
-  }
-
-  /**
-   * Retrieve subscribers that match an event and fire notification asynchronously
-   * 
-   * @param event
-   */
-  private void processEvent(Event event) {
-    subscriberRepository.findSubscribersUsingEvent(event).forEach(sub -> notifier.run(sub, event));
-  }
-
-}
diff --git a/src/main/java/org/onap/nbi/apis/hub/service/DMaaPEventsScheduler.java b/src/main/java/org/onap/nbi/apis/hub/service/DMaaPEventsScheduler.java
deleted file mode 100644 (file)
index 20bc2d9..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Copyright (c) 2019 Huawei
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.onap.nbi.apis.hub.service;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Profile;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Service;
-
-@Profile("default")
-@Service
-@EnableScheduling
-public class DMaaPEventsScheduler {
-
-  @Autowired
-  CheckDMaaPEventsManager checkDMaaPEventsManager;
-
-  @Scheduled(fixedDelayString = "${dmaapCheck.schedule}",
-      initialDelayString = "${dmaapCheck.initial}")
-  private void processDMaaPEvents() {
-      checkDMaaPEventsManager.checkForDMaaPAAIEvents();
-    
-  }
-}
-
index ed34322..2fe533b 100644 (file)
@@ -1,18 +1,22 @@
 /**
  * Copyright (c) 2018 Orange
  *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.onap.nbi.apis.hub.service;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.MappingJsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -24,72 +28,90 @@ import org.onap.nbi.apis.serviceorder.model.ServiceOrder;
 import org.onap.nbi.apis.serviceorder.model.ServiceOrderItem;
 import org.onap.nbi.commons.JacksonFilter;
 import org.onap.nbi.commons.JsonRepresentation;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.MappingJsonFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class EventFactory {
 
-  private static final ObjectMapper mapper = new ObjectMapper(new MappingJsonFactory());
+    private static final ObjectMapper mapper = new ObjectMapper(new MappingJsonFactory());
+    private static final Logger logger = LoggerFactory.getLogger(EventFactory.class);
+
+    public static Event getEvent(EventType eventType, ServiceOrder serviceOrder,
+        ServiceOrderItem serviceOrderItem) {
+        Event event = new Event();
+        event.setEventId(UUID.randomUUID().toString());
+        event.setEventDate(new Date());
+        event.setEventType(eventType.value());
+
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+        mapper.setDateFormat(df);
 
-  public static Event getEvent(EventType eventType, ServiceOrder serviceOrder,
-      ServiceOrderItem serviceOrderItem) {
-    Event event = new Event();
-    event.setEventId(UUID.randomUUID().toString());
-    event.setEventDate(new Date());
-    event.setEventType(eventType.value());
+        JsonNode serviceOrderJson = mapper.valueToTree(filterServiceOrder(serviceOrder));
 
-    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
-    mapper.setDateFormat(df);
+        if (EventType.SERVICE_ORDER_ITEM_STATE_CHANGE.equals(eventType)) {
+            JsonNode serviceOrderItemJson = mapper.valueToTree(serviceOrderItem);
+            ((ObjectNode) serviceOrderJson).putArray("orderItem").add(serviceOrderItemJson);
+        }
 
-    JsonNode serviceOrderJson = mapper.valueToTree(filterServiceOrder(serviceOrder));
+        event.setEvent(serviceOrderJson);
 
-    if (EventType.SERVICE_ORDER_ITEM_STATE_CHANGE.equals(eventType)) {
-      JsonNode serviceOrderItemJson = mapper.valueToTree(serviceOrderItem);
-      ((ObjectNode) serviceOrderJson).putArray("orderItem").add(serviceOrderItemJson);
+        return event;
     }
 
-    event.setEvent(serviceOrderJson);
+    public static Event getEvent(EventType eventType, ServiceInstanceEvent serviceInstanceEvent) {
+        Event event = new Event();
+        event.setEventId(UUID.randomUUID().toString());
+        event.setEventDate(new Date());
+        event.setEventType(eventType.value());
 
-    return event;
-  }
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+        mapper.setDateFormat(df);
 
-  public static Event getEvent(EventType eventType, ServiceInstanceEvent serviceInstanceEvent) {
-    Event event = new Event();
-    event.setEventId(UUID.randomUUID().toString());
-    event.setEventDate(new Date());
-    event.setEventType(eventType.value());
+        JsonNode serviceInstanceJson = mapper.valueToTree(serviceInstanceEvent);
 
-    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
-    mapper.setDateFormat(df);
+        event.setEvent(serviceInstanceJson);
 
-    JsonNode serviceInstanceJson = mapper.valueToTree(serviceInstanceEvent);
+        return event;
+    }
 
-    event.setEvent(serviceInstanceJson);
+    public static Event getEvent(EventType eventType, String eventString) {
+        Event event = new Event();
+        event.setEventId(UUID.randomUUID().toString());
+        event.setEventDate(new Date());
+        event.setEventType(eventType.value());
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+        mapper.setDateFormat(df);
+        JsonNode serviceInstanceJson = null;
+        try {
+            serviceInstanceJson = mapper.readTree(eventString);
+            event.setEvent(serviceInstanceJson);
+            return event;
+        } catch (IOException e) {
+            logger.error("IO Error when parsing Event JSON String {} ", eventString,
+                e.getMessage());
+        }
+        return null;
+    }
 
-    return event;
-  }
 
+    /**
+     * Filter ServiceOrderObject to produce a lightweight object that fit the eventBody specification
+     *
+     * @param serviceOrder
+     * @return
+     */
+    private static Object filterServiceOrder(final ServiceOrder serviceOrder) {
 
-  /**
-   * Filter ServiceOrderObject to produce a lightweight object that fit the eventBody specification
-   * 
-   * @param serviceOrder
-   * @return
-   */
-  private static Object filterServiceOrder(final ServiceOrder serviceOrder) {
+        Object filteredServiceOrder = null;
 
-    Object filteredServiceOrder = null;
+        if (serviceOrder != null) {
+            JsonRepresentation jsonRepresentation = new JsonRepresentation();
+            jsonRepresentation.add("id").add("href").add("externalId").add("state").add("orderDate")
+                .add("completionDateTime").add("orderItem");
 
-    if (serviceOrder != null) {
-      JsonRepresentation jsonRepresentation = new JsonRepresentation();
-      jsonRepresentation.add("id").add("href").add("externalId").add("state").add("orderDate")
-          .add("completionDateTime").add("orderItem");
+            filteredServiceOrder = JacksonFilter.createNode(serviceOrder, jsonRepresentation);
+        }
 
-      filteredServiceOrder = JacksonFilter.createNode(serviceOrder, jsonRepresentation);
+        return filteredServiceOrder;
     }
-
-    return filteredServiceOrder;
-  }
 }
diff --git a/src/main/java/org/onap/nbi/apis/hub/service/dmaap/CheckDMaaPEventsManager.java b/src/main/java/org/onap/nbi/apis/hub/service/dmaap/CheckDMaaPEventsManager.java
new file mode 100644 (file)
index 0000000..4ff40e6
--- /dev/null
@@ -0,0 +1,240 @@
+/**
+ * Copyright (c) 2019 Huawei
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.onap.nbi.apis.hub.service.dmaap;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.text.MessageFormat;
+import java.util.List;
+import javax.annotation.PostConstruct;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.nbi.OnapComponentsUrlPaths;
+import org.onap.nbi.apis.hub.model.Event;
+import org.onap.nbi.apis.hub.model.EventType;
+import org.onap.nbi.apis.hub.model.ServiceInstanceEvent;
+import org.onap.nbi.apis.hub.repository.SubscriberRepository;
+import org.onap.nbi.apis.hub.service.EventFactory;
+import org.onap.nbi.apis.hub.service.NotifierService;
+import org.onap.nbi.apis.serviceorder.model.RelatedParty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponentsBuilder;
+
+
+@Service
+public class CheckDMaaPEventsManager {
+
+    public static final String RESPONSE_STATUS = "response status : ";
+    public static final String RETURNS = " returns ";
+    public static final String ERROR_ON_CALLING = "error on calling ";
+    private final Logger logger = LoggerFactory.getLogger(CheckDMaaPEventsManager.class);
+    @Autowired
+    private RestTemplate restTemplate;
+    @Autowired
+    private SubscriberRepository subscriberRepository;
+    @Autowired
+    private NotifierService notifier;
+    @Value("${dmaap.host}")
+    private String dmaapHostname;
+    @Value("${dmaap.aai.topic}")
+    private String aaiTopic;
+    @Value("${dmaap.sdc.topic}")
+    private String sdcTopic;
+    @Value("${dmaap.consumergroup}")
+    private String consumerGroup;
+    @Value("${dmaap.consumerid}")
+    private String consumerId;
+    @Value("${dmaap.timeout}")
+    private String timeout;
+    private String dmaapGetEventsUrl;
+
+    @PostConstruct
+    private void setUpAndLogDMaaPUrl() {
+        dmaapGetEventsUrl = new StringBuilder().append(dmaapHostname)
+            .append(OnapComponentsUrlPaths.DMAAP_CONSUME_EVENTS).toString();
+        logger.info("DMaaP Get Events url :  " + dmaapGetEventsUrl);
+    }
+
+    public void checkForDMaaPAAIEvents() {
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> dmaapResponse = callDMaaPGetEvents(aaiTopic);
+        if (!CollectionUtils.isEmpty(dmaapResponse)) {
+            for (int i = 0; i < dmaapResponse.size(); i++) {
+                String aaiEventString = dmaapResponse.get(i);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("aai event returned was {}", aaiEventString);
+                }
+                try {
+                    JsonNode jsonNode = mapper.readValue(aaiEventString, JsonNode.class);
+                    JsonNode eventHeader = jsonNode.get("event-header");
+                    String aaiEventEntityType = eventHeader.get("entity-type").asText();
+                    String action = eventHeader.get("action").asText();
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("aaiEventEntityType is {} and action is {}", aaiEventEntityType, action);
+                    }
+                    if (aaiEventEntityType.equals("service-instance")) {
+                        {
+                            // parse the AAI-EVENT service-instance tree
+                            ServiceInstanceEvent serviceInstanceEvent = new ServiceInstanceEvent();
+                            RelatedParty relatedParty = new RelatedParty();
+                            JsonNode entity = jsonNode.get("entity");
+                            relatedParty.setId(entity.get("global-customer-id").asText());
+                            relatedParty.setName(entity.get("subscriber-name").asText());
+                            serviceInstanceEvent.setRelatedParty(relatedParty);
+                            JsonNode childServiceSubscription = entity.get("service-subscriptions");
+                            JsonNode serviceSubscriptions = childServiceSubscription.get("service-subscription");
+                            JsonNode serviceSubscription = serviceSubscriptions.get(0);
+                            String serviceSubscriptionPrint = serviceSubscription.toString();
+                            JsonNode childserviceInstances = serviceSubscription.get("service-instances");
+                            JsonNode serviceInstances = childserviceInstances.get("service-instance");
+                            JsonNode serviceInstance = serviceInstances.get(0);
+                            serviceInstanceEvent.setId(serviceInstance.get("service-instance-id").asText());
+                            serviceInstanceEvent
+                                .setHref("service/" + serviceInstance.get("service-instance-id").asText());
+                            serviceInstanceEvent.setState(serviceInstance.get("orchestration-status").asText());
+                            if (action.equals("CREATE")) {
+                                if (logger.isDebugEnabled()) {
+                                    logger.debug("sending service inventory event to listeners");
+                                }
+                                processEvent(
+                                    EventFactory.getEvent(EventType.SERVICE_CREATION, serviceInstanceEvent));
+                            } else if (action.equals("DELETE")) {
+                                processEvent(EventFactory.getEvent(EventType.SERVICE_REMOVE, serviceInstanceEvent));
+                            } else if (action.equals("UPDATE")) {
+                                processEvent(EventFactory.getEvent(EventType.SERVICE_ATTRIBUTE_VALUE_CHANGE,
+                                    serviceInstanceEvent));
+                            }
+
+
+                        }
+
+                    }
+
+                } catch (JsonParseException e) {
+                    logger.error(" unable to Parse AAI Event JSON String {}, exception is", aaiEventString,
+                        e.getMessage());
+                } catch (JsonMappingException e) {
+                    logger.error(" unable to Map AAI Event JSON String {} to Java Pojo, exception is",
+                        aaiEventString, e.getMessage());
+                } catch (IOException e) {
+                    logger.error("IO Error when parsing AAI Event JSON String {} ", aaiEventString,
+                        e.getMessage());
+                }
+            }
+        }
+    }
+
+    public void checkForDMaaPSDCEvents() {
+        List<String> dmaapResponse = callDMaaPGetEvents(sdcTopic);
+        if (!CollectionUtils.isEmpty(dmaapResponse)) {
+            for (int i = 0; i < dmaapResponse.size(); i++) {
+                String sdcEventString = dmaapResponse.get(i);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("sdc event returned was {}", sdcEventString);
+                }
+                processEvent(EventFactory.getEvent(EventType.SDC_DISTRIBUTION, sdcEventString));
+            }
+        }
+    }
+
+
+    public List<String> callDMaaPGetEvents(String topic) {
+
+        URI callURI = buildRequest(topic);
+        ResponseEntity<Object> response = callDMaaP(callURI);
+        if (response != null) {
+            return (List<String>) response.getBody();
+
+        } else {
+            return null;
+        }
+    }
+
+    public ResponseEntity<Object> callCheckConnectivity() {
+        URI callURI = buildRequest(null);
+
+        ResponseEntity<Object> response = restTemplate.exchange(callURI, HttpMethod.GET, buildRequestHeader(),
+            Object.class);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("response body : {} ", response.getBody().toString());
+            logger.debug("response status : {}", response.getStatusCodeValue());
+        }
+        return response;
+
+    }
+
+
+    private URI buildRequest(String topic) {
+        if (StringUtils.isEmpty(topic)) {
+            topic = aaiTopic;
+        }
+        String dmaapGetEventsUrlFormated = dmaapGetEventsUrl.replace("$topic", topic);
+        dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumergroup", consumerGroup);
+        dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumerid", consumerId);
+        dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$timeout", timeout);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Calling DMaaP Url : " + dmaapGetEventsUrlFormated);
+        }
+        UriComponentsBuilder callURI = UriComponentsBuilder.fromHttpUrl(dmaapGetEventsUrlFormated);
+        return callURI.build().encode().toUri();
+    }
+
+    private ResponseEntity<Object> callDMaaP(URI callURI) {
+        try {
+            ResponseEntity<Object> response =
+                restTemplate.exchange(callURI, HttpMethod.GET, buildRequestHeader(), Object.class);
+            if (logger.isDebugEnabled()) {
+                logger.debug("response body : {} ", response.getBody().toString());
+                logger.debug("response status : {}", response.getStatusCodeValue());
+            }
+            return response;
+        } catch (Exception e) {
+            String message = MessageFormat
+                .format("Exception while calling dmaap : {0}", callURI);
+            logger.error(message);
+            return null;
+        }
+
+    }
+
+
+    private HttpEntity<String> buildRequestHeader() {
+        HttpHeaders httpHeaders = new HttpHeaders();
+        httpHeaders.add("Accept", "application/json");
+        httpHeaders.add("Content-Type", "application/json");
+        return new HttpEntity<>("parameters", httpHeaders);
+    }
+
+    /**
+     * Retrieve subscribers that match an event and fire notification asynchronously
+     */
+    private void processEvent(Event event) {
+        subscriberRepository.findSubscribersUsingEvent(event).forEach(sub -> notifier.run(sub, event));
+    }
+
+}
diff --git a/src/main/java/org/onap/nbi/apis/hub/service/dmaap/DMaaPEventsScheduler.java b/src/main/java/org/onap/nbi/apis/hub/service/dmaap/DMaaPEventsScheduler.java
new file mode 100644 (file)
index 0000000..cdd1825
--- /dev/null
@@ -0,0 +1,38 @@
+/**
+ * Copyright (c) 2019 Huawei
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.onap.nbi.apis.hub.service.dmaap;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Profile;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+@Profile("default")
+@Service
+@EnableScheduling
+public class DMaaPEventsScheduler {
+
+    @Autowired
+    CheckDMaaPEventsManager checkDMaaPEventsManager;
+
+    @Scheduled(fixedDelayString = "${dmaapCheck.schedule}",
+        initialDelayString = "${dmaapCheck.initial}")
+    private void processDMaaPEvents() {
+        checkDMaaPEventsManager.checkForDMaaPAAIEvents();
+        checkDMaaPEventsManager.checkForDMaaPSDCEvents();
+
+    }
+}
+
index d9bedc0..c3eb746 100644 (file)
@@ -13,7 +13,7 @@
 package org.onap.nbi.apis.status;
 
 import java.text.MessageFormat;
-import org.onap.nbi.apis.hub.service.CheckDMaaPEventsManager;
+import org.onap.nbi.apis.hub.service.dmaap.CheckDMaaPEventsManager;
 import org.onap.nbi.apis.servicecatalog.SdcClient;
 import org.onap.nbi.apis.serviceinventory.AaiClient;
 import org.onap.nbi.apis.serviceorder.SoClient;
@@ -59,7 +59,7 @@ public class OnapClient {
                     soClient.callCheckConnectivity();
                     break;
                 case DMAAP:
-                    checkDMaaPEventsManager.callDMaaPGetEvents();
+                    checkDMaaPEventsManager.callCheckConnectivity();
                     break;
             }
         } catch (BackendFunctionalException e) {
index 028040d..ed34f0a 100644 (file)
@@ -64,7 +64,8 @@ so.project.name                     = Project-generic
 
 # DMAAP
 dmaap.host                          = http://127.0.0.1:8091
-dmaap.topic                         = AAI-EVENT
+dmaap.aai.topic                     = AAI-EVENT
+dmaap.sdc.topic                     = SDC-DISTR-NOTIF-TOPIC-AUTO
 dmaap.consumergroup                 = NBICG1
 dmaap.consumerid                    = NBIC1
 dmaap.timeout                       = 2000
index 741682e..fe7dde0 100644 (file)
@@ -72,7 +72,8 @@ so.project.name                      = Project-generic
 
 # DMAAP
 dmaap.host                           = http://10.0.6.1:3904
-dmaap.topic                          = AAI-EVENT
+dmaap.aai.topic                      = AAI-EVENT
+dmaap.sdc.topic                      = SDC-DISTR-NOTIF-TOPIC-AUTO
 dmaap.consumergroup                  = NBICG1
 dmaap.consumerid                     = NBIC1
 dmaap.timeout                        = 2000