2 * Copyright (c) 2019 Huawei
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5 * the License. You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11 * specific language governing permissions and limitations under the License.
14 package org.onap.nbi.apis.hub.service.dmaap;
16 import com.fasterxml.jackson.core.JsonParseException;
17 import com.fasterxml.jackson.databind.JsonMappingException;
18 import com.fasterxml.jackson.databind.JsonNode;
19 import com.fasterxml.jackson.databind.ObjectMapper;
20 import java.io.IOException;
22 import java.text.MessageFormat;
23 import java.util.List;
24 import javax.annotation.PostConstruct;
25 import org.apache.commons.lang3.StringUtils;
26 import org.onap.nbi.OnapComponentsUrlPaths;
27 import org.onap.nbi.apis.hub.model.Event;
28 import org.onap.nbi.apis.hub.model.EventType;
29 import org.onap.nbi.apis.hub.model.ServiceInstanceEvent;
30 import org.onap.nbi.apis.hub.repository.SubscriberRepository;
31 import org.onap.nbi.apis.hub.service.EventFactory;
32 import org.onap.nbi.apis.hub.service.NotifierService;
33 import org.onap.nbi.apis.serviceorder.model.RelatedParty;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Value;
38 import org.springframework.http.HttpEntity;
39 import org.springframework.http.HttpHeaders;
40 import org.springframework.http.HttpMethod;
41 import org.springframework.http.ResponseEntity;
42 import org.springframework.stereotype.Service;
43 import org.springframework.util.CollectionUtils;
44 import org.springframework.web.client.RestTemplate;
45 import org.springframework.web.util.UriComponentsBuilder;
49 public class CheckDMaaPEventsManager {
51 public static final String RESPONSE_STATUS = "response status : ";
52 public static final String RETURNS = " returns ";
53 public static final String ERROR_ON_CALLING = "error on calling ";
54 private final Logger logger = LoggerFactory.getLogger(CheckDMaaPEventsManager.class);
56 private RestTemplate restTemplate;
58 private SubscriberRepository subscriberRepository;
60 private NotifierService notifier;
61 @Value("${dmaap.host}")
62 private String dmaapHostname;
63 @Value("${dmaap.aai.topic}")
64 private String aaiTopic;
65 @Value("${dmaap.sdc.topic}")
66 private String sdcTopic;
67 @Value("${dmaap.consumergroup}")
68 private String consumerGroup;
69 @Value("${dmaap.consumerid}")
70 private String consumerId;
71 @Value("${dmaap.timeout}")
72 private String timeout;
73 private String dmaapGetEventsUrl;
76 private void setUpAndLogDMaaPUrl() {
77 dmaapGetEventsUrl = new StringBuilder().append(dmaapHostname)
78 .append(OnapComponentsUrlPaths.DMAAP_CONSUME_EVENTS).toString();
79 logger.info("DMaaP Get Events url : " + dmaapGetEventsUrl);
82 public void checkForDMaaPAAIEvents() {
83 ObjectMapper mapper = new ObjectMapper();
84 List<String> dmaapResponse = callDMaaPGetEvents(aaiTopic);
85 if (!CollectionUtils.isEmpty(dmaapResponse)) {
86 for (int i = 0; i < dmaapResponse.size(); i++) {
87 String aaiEventString = dmaapResponse.get(i);
88 if (logger.isDebugEnabled()) {
89 logger.debug("aai event returned was {}", aaiEventString);
92 JsonNode jsonNode = mapper.readValue(aaiEventString, JsonNode.class);
93 JsonNode eventHeader = jsonNode.get("event-header");
94 String aaiEventEntityType = eventHeader.get("entity-type").asText();
95 String action = eventHeader.get("action").asText();
96 if (logger.isDebugEnabled()) {
97 logger.debug("aaiEventEntityType is {} and action is {}", aaiEventEntityType, action);
99 if (aaiEventEntityType.equals("service-instance")) {
101 // parse the AAI-EVENT service-instance tree
102 ServiceInstanceEvent serviceInstanceEvent = new ServiceInstanceEvent();
103 RelatedParty relatedParty = new RelatedParty();
104 JsonNode entity = jsonNode.get("entity");
105 relatedParty.setId(entity.get("global-customer-id").asText());
106 relatedParty.setName(entity.get("subscriber-name").asText());
107 serviceInstanceEvent.setRelatedParty(relatedParty);
108 JsonNode childServiceSubscription = entity.get("service-subscriptions");
109 JsonNode serviceSubscriptions = childServiceSubscription.get("service-subscription");
110 JsonNode serviceSubscription = serviceSubscriptions.get(0);
111 String serviceSubscriptionPrint = serviceSubscription.toString();
112 JsonNode childserviceInstances = serviceSubscription.get("service-instances");
113 JsonNode serviceInstances = childserviceInstances.get("service-instance");
114 JsonNode serviceInstance = serviceInstances.get(0);
115 serviceInstanceEvent.setId(serviceInstance.get("service-instance-id").asText());
117 .setHref("service/" + serviceInstance.get("service-instance-id").asText());
118 serviceInstanceEvent.setState(serviceInstance.get("orchestration-status").asText());
119 if (action.equals("CREATE")) {
120 if (logger.isDebugEnabled()) {
121 logger.debug("sending service inventory event to listeners");
124 EventFactory.getEvent(EventType.SERVICE_CREATION, serviceInstanceEvent));
125 } else if (action.equals("DELETE")) {
126 processEvent(EventFactory.getEvent(EventType.SERVICE_REMOVE, serviceInstanceEvent));
127 } else if (action.equals("UPDATE")) {
128 processEvent(EventFactory.getEvent(EventType.SERVICE_ATTRIBUTE_VALUE_CHANGE,
129 serviceInstanceEvent));
137 } catch (JsonParseException e) {
138 logger.error(" unable to Parse AAI Event JSON String {}, exception is", aaiEventString,
140 } catch (JsonMappingException e) {
141 logger.error(" unable to Map AAI Event JSON String {} to Java Pojo, exception is",
142 aaiEventString, e.getMessage());
143 } catch (IOException e) {
144 logger.error("IO Error when parsing AAI Event JSON String {} ", aaiEventString,
151 public void checkForDMaaPSDCEvents() {
152 List<String> dmaapResponse = callDMaaPGetEvents(sdcTopic);
153 if (!CollectionUtils.isEmpty(dmaapResponse)) {
154 for (int i = 0; i < dmaapResponse.size(); i++) {
155 String sdcEventString = dmaapResponse.get(i);
156 if (logger.isDebugEnabled()) {
157 logger.debug("sdc event returned was {}", sdcEventString);
159 processEvent(EventFactory.getEvent(EventType.SDC_DISTRIBUTION, sdcEventString));
165 public List<String> callDMaaPGetEvents(String topic) {
167 URI callURI = buildRequest(topic);
168 ResponseEntity<Object> response = callDMaaP(callURI);
169 if (response != null) {
170 return (List<String>) response.getBody();
177 public ResponseEntity<Object> callCheckConnectivity() {
178 URI callURI = buildRequest(null);
180 ResponseEntity<Object> response = restTemplate.exchange(callURI, HttpMethod.GET, buildRequestHeader(),
183 if (logger.isDebugEnabled()) {
184 logger.debug("response body : {} ", response.getBody().toString());
185 logger.debug("response status : {}", response.getStatusCodeValue());
192 private URI buildRequest(String topic) {
193 if (StringUtils.isEmpty(topic)) {
196 String dmaapGetEventsUrlFormated = dmaapGetEventsUrl.replace("$topic", topic);
197 dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumergroup", consumerGroup);
198 dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$consumerid", consumerId);
199 dmaapGetEventsUrlFormated = dmaapGetEventsUrlFormated.replace("$timeout", timeout);
200 if (logger.isDebugEnabled()) {
201 logger.debug("Calling DMaaP Url : " + dmaapGetEventsUrlFormated);
203 UriComponentsBuilder callURI = UriComponentsBuilder.fromHttpUrl(dmaapGetEventsUrlFormated);
204 return callURI.build().encode().toUri();
207 private ResponseEntity<Object> callDMaaP(URI callURI) {
209 ResponseEntity<Object> response =
210 restTemplate.exchange(callURI, HttpMethod.GET, buildRequestHeader(), Object.class);
211 if (logger.isDebugEnabled()) {
212 logger.debug("response body : {} ", response.getBody().toString());
213 logger.debug("response status : {}", response.getStatusCodeValue());
216 } catch (Exception e) {
217 String message = MessageFormat
218 .format("Exception while calling dmaap : {0}", callURI);
219 logger.error(message);
226 private HttpEntity<String> buildRequestHeader() {
227 HttpHeaders httpHeaders = new HttpHeaders();
228 httpHeaders.add("Accept", "application/json");
229 httpHeaders.add("Content-Type", "application/json");
230 return new HttpEntity<>("parameters", httpHeaders);
234 * Retrieve subscribers that match an event and fire notification asynchronously
236 private void processEvent(Event event) {
237 subscriberRepository.findSubscribersUsingEvent(event).forEach(sub -> notifier.run(sub, event));