Remove DMaaP dependency from AAI-Common
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / kafka / AAIKafkaEventJMSConsumer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Modifications Copyright © 2018 IBM.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23  package org.onap.aai.kafka;
24
25  import java.util.Map;
26  import java.util.Objects;
27  
28  import javax.jms.JMSException;
29  import javax.jms.Message;
30  import javax.jms.MessageListener;
31  import javax.jms.TextMessage;
32  
33  import org.json.JSONException;
34  import org.json.JSONObject;
35  import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
36  import org.onap.aai.exceptions.AAIException;
37  import org.onap.aai.logging.AaiElsErrorCode;
38  import org.onap.aai.logging.ErrorLogHelper;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  import org.slf4j.MDC;
42  import org.springframework.core.env.Environment;
43  import org.springframework.kafka.core.KafkaTemplate;
44  
45  public class AAIKafkaEventJMSConsumer implements MessageListener {
46  
47      private static final String EVENT_TOPIC = "event-topic";
48  
49      private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
50  
51      private Environment environment;
52      private Map<String, String> mdcCopy;
53      private KafkaTemplate<String,String> kafkaTemplate;
54  
55      public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
56          super();
57          mdcCopy = MDC.getCopyOfContextMap();
58          Objects.nonNull(environment);
59          this.environment = environment;
60          this.kafkaTemplate=kafkaTemplate;
61      }
62  
63      @Override
64      public void onMessage(Message message) {
65  
66          if (kafkaTemplate == null) {
67              return;
68          }
69  
70          String jsmMessageTxt = "";
71          String aaiEvent = "";
72          JSONObject aaiEventHeader;
73          JSONObject joPayload;
74          String transactionId = "";
75          String serviceName = "";
76          String eventName = "";
77          String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
78          String errorDescription = "";
79  
80          if (mdcCopy != null) {
81              MDC.setContextMap(mdcCopy);
82          }
83  
84          if (message instanceof TextMessage) {
85              AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
86              try {
87                  jsmMessageTxt = ((TextMessage) message).getText();
88                  JSONObject jo = new JSONObject(jsmMessageTxt);
89                  if (jo.has("aaiEventPayload")) {
90                      joPayload = jo.getJSONObject("aaiEventPayload");
91                      aaiEvent = joPayload.toString();
92                  } else {
93                      return;
94                  }
95                  if (jo.getString(EVENT_TOPIC) != null) {
96                      eventName = jo.getString(EVENT_TOPIC);
97                  }
98                  if (joPayload.has("event-header")) {
99                      try {
100                          aaiEventHeader = joPayload.getJSONObject("event-header");
101                          if (aaiEventHeader.has("id")) {
102                              transactionId = aaiEventHeader.get("id").toString();
103                          }
104                          if (aaiEventHeader.has("entity-link")) {
105                              serviceName = aaiEventHeader.get("entity-link").toString();
106                          }
107                      } catch (JSONException jexc) {
108                          // ignore, this is just used for logging
109                      }
110                  }
111                  metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
112  
113  
114                  if ("AAI-EVENT".equals(eventName)) {
115                      // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
116                      kafkaTemplate.send(eventName,aaiEvent);
117  
118                  } else {
119                      LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
120                  }
121              } catch (JMSException | JSONException e) {
122                  aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
123                  errorDescription = e.getMessage();
124                  ErrorLogHelper.logException(new AAIException("AAI_7350"));
125              } catch (Exception e) {
126                  aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
127                  errorDescription = e.getMessage();
128                  ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
129              } finally {
130                  metricLog.post(aaiElsErrorCode, errorDescription);
131              }
132          }
133      }
134  }
135