Remove DMaaP dependency from AAI-Common 59/137659/3
authorSingh <soumya.e.singh@accenture.com>
Tue, 9 Apr 2024 12:08:07 +0000 (17:38 +0530)
committerSingh <soumya.e.singh@accenture.com>
Wed, 10 Apr 2024 10:06:25 +0000 (15:36 +0530)
- Remove Dmaap dependency in AAI-Common and replace it with Kafka.

Issue-ID: AAI-3792
Change-Id: If3fd5c3bdc2448f7e260a26000b02a510c80d7fb
Signed-off-by: Singh <soumya.e.singh@accenture.com>
22 files changed:
aai-annotations/src/main/java/org/onap/aai/schema/enums/ObjectMetadata.java
aai-core/pom.xml
aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java [deleted file]
aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java [new file with mode: 0644]
aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java [moved from aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java with 95% similarity]
aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java [moved from aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java with 97% similarity]
aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java [deleted file]
aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java [deleted file]
aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java [new file with mode: 0644]
aai-core/src/main/resources/logback.xml
aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java [new file with mode: 0644]
aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java
aai-core/src/test/resources/logback.xml
aai-core/src/test/resources/payloads/resource/aai-event.json [new file with mode: 0644]
aai-core/src/test/resources/payloads/resource/aai-invalid-event.json [new file with mode: 0644]
aai-els-onap-logging/src/main/java/org/onap/aai/aailog/logs/AaiDmaapMetricLog.java
aai-els-onap-logging/src/main/java/org/onap/logging/filter/base/ONAPComponents.java
aai-els-onap-logging/src/test/resources/error.properties
aai-parent/pom.xml
docs/AAI REST API Documentation/AAIRESTAPI.rst

index 406800f..3443d32 100644 (file)
@@ -28,7 +28,7 @@ public enum ObjectMetadata {
     DESCRIPTION("description"),
     /**
      * names of properties to appear in relationship-lists
-     * and parent objects in DMaaP messages
+     * and parent objects in Kafka messages
      * <br>
      * <b>comma separated list</b>
      */
index fb64669..e48e8f5 100644 (file)
@@ -135,6 +135,17 @@ limitations under the License.
                        <artifactId>commons-text</artifactId>
                        <scope>compile</scope>
                </dependency>
+               <dependency>
+               <groupId>org.junit.vintage</groupId>
+               <artifactId>junit-vintage-engine</artifactId>
+               <scope>test</scope>
+               <exclusions>
+                       <exclusion>
+                               <groupId>org.hamcrest</groupId>
+                               <artifactId>hamcrest-core</artifactId>
+                       </exclusion>
+               </exclusions>
+               </dependency>
                <dependency>
                        <groupId>com.att.eelf</groupId>
                        <artifactId>eelf-core</artifactId>
@@ -190,6 +201,16 @@ limitations under the License.
                        <groupId>com.fasterxml.jackson.jaxrs</groupId>
                        <artifactId>jackson-jaxrs-json-provider</artifactId>
                </dependency>
+               <dependency>
+               <groupId>org.springframework.kafka</groupId>
+               <artifactId>spring-kafka</artifactId>
+                       <version>2.7.14</version>
+               </dependency>
+               <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
                <dependency>
                        <groupId>com.googlecode.json-simple</groupId>
                        <artifactId>json-simple</artifactId>
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java
deleted file mode 100644 (file)
index d3addeb..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- *  Modifications Copyright © 2018 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.dmaap;
-
-import java.util.Map;
-import java.util.Objects;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
-import org.onap.aai.exceptions.AAIException;
-import org.onap.aai.logging.AaiElsErrorCode;
-import org.onap.aai.logging.ErrorLogHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.core.env.Environment;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.web.client.RestTemplate;
-
-public class AAIDmaapEventJMSConsumer implements MessageListener {
-
-    private static final String EVENT_TOPIC = "event-topic";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class);
-
-    private RestTemplate restTemplate;
-
-    private HttpHeaders httpHeaders;
-
-    private Environment environment;
-    private Map<String, String> mdcCopy;
-
-    public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) {
-        super();
-        mdcCopy = MDC.getCopyOfContextMap();
-        Objects.nonNull(environment);
-        Objects.nonNull(restTemplate);
-        Objects.nonNull(httpHeaders);
-        this.environment = environment;
-        this.restTemplate = restTemplate;
-        this.httpHeaders = httpHeaders;
-    }
-
-    @Override
-    public void onMessage(Message message) {
-
-        if (restTemplate == null) {
-            return;
-        }
-
-        String jsmMessageTxt = "";
-        String aaiEvent = "";
-        JSONObject aaiEventHeader;
-        JSONObject joPayload;
-        String transactionId = "";
-        String serviceName = "";
-        String eventName = "";
-        String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
-        String errorDescription = "";
-
-        if (mdcCopy != null) {
-            MDC.setContextMap(mdcCopy);
-        }
-
-        if (message instanceof TextMessage) {
-            AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
-            try {
-                jsmMessageTxt = ((TextMessage) message).getText();
-                JSONObject jo = new JSONObject(jsmMessageTxt);
-                if (jo.has("aaiEventPayload")) {
-                    joPayload = jo.getJSONObject("aaiEventPayload");
-                    aaiEvent = joPayload.toString();
-                } else {
-                    return;
-                }
-                if (jo.getString(EVENT_TOPIC) != null) {
-                    eventName = jo.getString(EVENT_TOPIC);
-                }
-                if (joPayload.has("event-header")) {
-                    try {
-                        aaiEventHeader = joPayload.getJSONObject("event-header");
-                        if (aaiEventHeader.has("id")) {
-                            transactionId = aaiEventHeader.get("id").toString();
-                        }
-                        if (aaiEventHeader.has("entity-link")) {
-                            serviceName = aaiEventHeader.get("entity-link").toString();
-                        }
-                    } catch (JSONException jexc) {
-                        // ignore, this is just used for logging
-                    }
-                }
-                metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-
-                HttpEntity<String> httpEntity = new HttpEntity<String>(aaiEvent, httpHeaders);
-
-                String transportType = environment.getProperty("dmaap.ribbon.transportType", "http");
-                String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers");
-                String endpoint = "/events/" + eventName;
-
-                if ("AAI-EVENT".equals(eventName)) {
-                    restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
-                } else {
-                    LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
-                }
-            } catch (JMSException | JSONException e) {
-                aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
-                errorDescription = e.getMessage();
-                ErrorLogHelper.logException(new AAIException("AAI_7350"));
-            } catch (Exception e) {
-                aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
-                errorDescription = e.getMessage();
-                ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
-            } finally {
-                metricLog.post(aaiElsErrorCode, errorDescription);
-            }
-        }
-    }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
new file mode 100644 (file)
index 0000000..731f3df
--- /dev/null
@@ -0,0 +1,135 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ *  Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.kafka;
+
+ import java.util.Map;
+ import java.util.Objects;
+ import javax.jms.JMSException;
+ import javax.jms.Message;
+ import javax.jms.MessageListener;
+ import javax.jms.TextMessage;
+ import org.json.JSONException;
+ import org.json.JSONObject;
+ import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+ import org.onap.aai.exceptions.AAIException;
+ import org.onap.aai.logging.AaiElsErrorCode;
+ import org.onap.aai.logging.ErrorLogHelper;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.slf4j.MDC;
+ import org.springframework.core.env.Environment;
+ import org.springframework.kafka.core.KafkaTemplate;
+ public class AAIKafkaEventJMSConsumer implements MessageListener {
+     private static final String EVENT_TOPIC = "event-topic";
+     private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+     private Environment environment;
+     private Map<String, String> mdcCopy;
+     private KafkaTemplate<String,String> kafkaTemplate;
+     public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
+         super();
+         mdcCopy = MDC.getCopyOfContextMap();
+         Objects.nonNull(environment);
+         this.environment = environment;
+         this.kafkaTemplate=kafkaTemplate;
+     }
+     @Override
+     public void onMessage(Message message) {
+         if (kafkaTemplate == null) {
+             return;
+         }
+         String jsmMessageTxt = "";
+         String aaiEvent = "";
+         JSONObject aaiEventHeader;
+         JSONObject joPayload;
+         String transactionId = "";
+         String serviceName = "";
+         String eventName = "";
+         String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+         String errorDescription = "";
+         if (mdcCopy != null) {
+             MDC.setContextMap(mdcCopy);
+         }
+         if (message instanceof TextMessage) {
+             AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+             try {
+                 jsmMessageTxt = ((TextMessage) message).getText();
+                 JSONObject jo = new JSONObject(jsmMessageTxt);
+                 if (jo.has("aaiEventPayload")) {
+                     joPayload = jo.getJSONObject("aaiEventPayload");
+                     aaiEvent = joPayload.toString();
+                 } else {
+                     return;
+                 }
+                 if (jo.getString(EVENT_TOPIC) != null) {
+                     eventName = jo.getString(EVENT_TOPIC);
+                 }
+                 if (joPayload.has("event-header")) {
+                     try {
+                         aaiEventHeader = joPayload.getJSONObject("event-header");
+                         if (aaiEventHeader.has("id")) {
+                             transactionId = aaiEventHeader.get("id").toString();
+                         }
+                         if (aaiEventHeader.has("entity-link")) {
+                             serviceName = aaiEventHeader.get("entity-link").toString();
+                         }
+                     } catch (JSONException jexc) {
+                         // ignore, this is just used for logging
+                     }
+                 }
+                 metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
+                 if ("AAI-EVENT".equals(eventName)) {
+                     // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
+                     kafkaTemplate.send(eventName,aaiEvent);
+                 } else {
+                     LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
+                 }
+             } catch (JMSException | JSONException e) {
+                 aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+                 errorDescription = e.getMessage();
+                 ErrorLogHelper.logException(new AAIException("AAI_7350"));
+             } catch (Exception e) {
+                 aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+                 errorDescription = e.getMessage();
+                 ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
+             } finally {
+                 metricLog.post(aaiElsErrorCode, errorDescription);
+             }
+         }
+     }
+ }
\ No newline at end of file
@@ -20,7 +20,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -29,11 +29,11 @@ import org.onap.aai.util.AAIConfig;
 import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.jms.core.JmsTemplate;
 
-public class AAIDmaapEventJMSProducer implements MessageProducer {
+public class AAIKafkaEventJMSProducer implements MessageProducer {
 
     private JmsTemplate jmsTemplate;
 
-    public AAIDmaapEventJMSProducer() {
+    public AAIKafkaEventJMSProducer() {
         if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
             this.jmsTemplate = new JmsTemplate();
             String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
@@ -18,7 +18,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
 
 import org.json.JSONObject;
 
index 598ef23..6f3e888 100644 (file)
@@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity;
 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
 import org.json.JSONException;
 import org.json.JSONObject;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
 import org.onap.aai.domain.notificationEvent.NotificationEvent;
 import org.onap.aai.exceptions.AAIException;
 import org.onap.aai.introspection.Introspector;
 import org.onap.aai.introspection.Loader;
 import org.onap.aai.introspection.exceptions.AAIUnknownObjectException;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
@@ -59,12 +59,12 @@ public class StoreNotificationEvent {
      * Instantiates a new store notification event.
      */
     public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
-        this.messageProducer = new AAIDmaapEventJMSProducer();
+        this.messageProducer = new AAIKafkaEventJMSProducer();
         this.transactionId = transactionId;
         this.sourceOfTruth = sourceOfTruth;
     }
 
-    public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+    public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
         this.messageProducer = producer;
         this.transactionId = transactionId;
         this.sourceOfTruth = sourceOfTruth;
@@ -139,7 +139,7 @@ public class StoreNotificationEvent {
         try {
             PojoUtils pu = new PojoUtils();
             String entityJson = pu.getJsonFromObject(ne);
-            sendToDmaapJmsQueue(entityJson);
+            sendToKafkaJmsQueue(entityJson);
             return entityJson;
         } catch (Exception e) {
             throw new AAIException("AAI_7350", e);
@@ -227,7 +227,7 @@ public class StoreNotificationEvent {
             marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false);
             marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false);
             marshaller.marshal(notificationEvent, result);
-            this.sendToDmaapJmsQueue(result.toString());
+            this.sendToKafkaJmsQueue(result.toString());
 
         } catch (Exception e) {
             throw new AAIException("AAI_7350", e);
@@ -380,7 +380,7 @@ public class StoreNotificationEvent {
             notificationEvent.setValue("entity", obj.getUnderlyingObject());
 
             String entityJson = notificationEvent.marshal(false);
-            sendToDmaapJmsQueue(entityJson);
+            sendToKafkaJmsQueue(entityJson);
             return entityJson;
         } catch (JSONException e) {
             throw new AAIException("AAI_7350", e);
@@ -389,7 +389,7 @@ public class StoreNotificationEvent {
         }
     }
 
-    private void sendToDmaapJmsQueue(String entityString) throws JSONException {
+    private void sendToKafkaJmsQueue(String entityString) throws JSONException {
 
         JSONObject entityJsonObject = new JSONObject(entityString);
 
index 10d47cd..6fbc297 100644 (file)
@@ -28,8 +28,8 @@ import java.util.Date;
 import java.util.Map;
 
 import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
 import org.onap.aai.util.AAIConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class DeltaEvents {
     private MessageProducer messageProducer;
 
     public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
-        this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer());
+        this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
     }
 
     public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java
deleted file mode 100644 (file)
index 078ca96..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Profile;
-import org.springframework.http.HttpHeaders;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.web.client.RestTemplate;
-
-@Profile("dmaap")
-@Configuration
-public class DmaapConfig {
-
-    @Autowired
-    private ApplicationContext ctx;
-
-    @Autowired
-    @Qualifier("dmaapRestTemplate")
-    private RestTemplate dmaapRestTemplate;
-
-    @Autowired
-    @Qualifier("dmaapHeaders")
-    private HttpHeaders dmaapHeaders;
-
-    @Value("${jms.bind.address}")
-    private String bindAddress;
-
-    @PostConstruct
-    public void init() {
-        System.setProperty("activemq.tcp.url", bindAddress);
-    }
-
-    @Bean(destroyMethod = "stop")
-    public BrokerService brokerService() throws Exception {
-
-        BrokerService broker = new BrokerService();
-        broker.addConnector(bindAddress);
-        broker.setPersistent(false);
-        broker.setUseJmx(false);
-        broker.setSchedulerSupport(false);
-        broker.start();
-
-        return broker;
-    }
-
-    @Bean(name = "connectionFactory")
-    public ActiveMQConnectionFactory activeMQConnectionFactory() {
-        return new ActiveMQConnectionFactory(bindAddress);
-    }
-
-    @Bean
-    public CachingConnectionFactory cachingConnectionFactory() {
-        return new CachingConnectionFactory(activeMQConnectionFactory());
-    }
-
-    @Bean(name = "destinationQueue")
-    public ActiveMQQueue activeMQQueue() {
-        return new ActiveMQQueue("IN_QUEUE");
-    }
-
-    @Bean
-    public JmsTemplate jmsTemplate() {
-        JmsTemplate jmsTemplate = new JmsTemplate();
-
-        jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
-        jmsTemplate.setDefaultDestination(activeMQQueue());
-
-        return jmsTemplate;
-    }
-
-    @Bean
-    public AAIDmaapEventJMSProducer jmsProducer() {
-        return new AAIDmaapEventJMSProducer();
-    }
-
-    @Bean(name = "jmsConsumer")
-    public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception {
-        return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders);
-    }
-
-    @Bean
-    public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-
-        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-
-        messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
-        messageListenerContainer.setDestinationName("IN_QUEUE");
-        messageListenerContainer.setMessageListener(jmsConsumer());
-
-        return messageListenerContainer;
-    }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java
deleted file mode 100644 (file)
index cbd9b10..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Base64;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.web.client.RestTemplate;
-
-@Configuration
-public class EventClientPublisher {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class);
-
-    @Value("${dmaap.ribbon.listOfServers:}")
-    private String hosts;
-
-    @Value("${dmaap.ribbon.username:}")
-    private String username;
-
-    @Value("${dmaap.ribbon.password:}")
-    private String password;
-
-    @Value("${dmaap.ribbon.topic:AAI-EVENT}")
-    private String topic;
-
-    @Value("${dmaap.ribbon.batchSize:100}")
-    private int maxBatchSize;
-
-    @Value("${dmaap.ribbon.maxAgeMs:250}")
-    private int maxAgeMs;
-
-    @Value("${dmaap.ribbon.delayBetweenBatches:100}")
-    private int delayBetweenBatches;
-
-    @Value("${dmaap.ribbon.protocol:http}")
-    private String protocol;
-
-    @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}")
-    private String tranportType;
-
-    @Value("${dmaap.ribbon.contentType:application/json}")
-    private String contentType;
-
-    @Bean(name = "dmaapRestTemplate")
-    public RestTemplate dmaapRestTemplate() {
-        return new RestTemplate();
-    }
-
-    @Bean(name = "dmaapHeaders")
-    public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException {
-
-        HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.setContentType(MediaType.APPLICATION_JSON);
-
-        if (username != null && password != null) {
-
-            if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) {
-
-                byte[] userPass = (username + ":" + password).getBytes("UTF-8");
-
-                httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass));
-            }
-        }
-
-        return httpHeaders;
-    }
-
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
new file mode 100644 (file)
index 0000000..71ae5b6
--- /dev/null
@@ -0,0 +1,175 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.web;
+
+ import java.util.HashMap;
+ import java.util.Map;
+ import javax.annotation.PostConstruct;
+ import org.apache.activemq.ActiveMQConnectionFactory;
+ import org.apache.activemq.broker.BrokerService;
+ import org.apache.activemq.command.ActiveMQQueue;
+ import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.beans.factory.annotation.Value;
+ import org.springframework.context.ApplicationContext;
+ import org.springframework.context.annotation.Bean;
+ import org.springframework.context.annotation.Configuration;
+ import org.springframework.context.annotation.Profile;
+ import org.springframework.jms.connection.CachingConnectionFactory;
+ import org.springframework.jms.core.JmsTemplate;
+ import org.springframework.jms.listener.DefaultMessageListenerContainer;
+ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+ import org.springframework.kafka.core.KafkaTemplate;
+ import org.springframework.kafka.core.ProducerFactory;
+ @Profile("kafka")
+ @Configuration
+ public class KafkaConfig {
+     @Autowired
+     private ApplicationContext ctx;
+     @Value("${jms.bind.address}")
+     private String bindAddress;
+     @Value("${spring.kafka.producer.bootstrap-servers}")
+     private String bootstrapServers;
+     @Value("${spring.kafka.producer.properties.security.protocol}")
+     private String securityProtocol;
+     @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+     private String saslMechanism;
+     @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+     private String saslJaasConfig;
+     @Value("${spring.kafka.producer.retries}")
+     private Integer retries;
+     private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+     @PostConstruct
+     public void init() {
+         System.setProperty("activemq.tcp.url", bindAddress);
+     }
+     @Bean(destroyMethod = "stop")
+     public BrokerService brokerService() throws Exception {
+         BrokerService broker = new BrokerService();
+         broker.addConnector(bindAddress);
+         broker.setPersistent(false);
+         broker.setUseJmx(false);
+         broker.setSchedulerSupport(false);
+         broker.start();
+         return broker;
+     }
+     @Bean(name = "connectionFactory")
+     public ActiveMQConnectionFactory activeMQConnectionFactory() {
+         return new ActiveMQConnectionFactory(bindAddress);
+     }
+     @Bean
+     public CachingConnectionFactory cachingConnectionFactory() {
+         return new CachingConnectionFactory(activeMQConnectionFactory());
+     }
+     @Bean(name = "destinationQueue")
+     public ActiveMQQueue activeMQQueue() {
+         return new ActiveMQQueue("IN_QUEUE");
+     }
+     @Bean
+     public JmsTemplate jmsTemplate() {
+         JmsTemplate jmsTemplate = new JmsTemplate();
+         jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
+         jmsTemplate.setDefaultDestination(activeMQQueue());
+         return jmsTemplate;
+     }
+     @Bean
+     public AAIKafkaEventJMSProducer jmsProducer() {
+         return new AAIKafkaEventJMSProducer();
+     }
+     @Bean(name = "jmsConsumer")
+     public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
+         return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
+     }
+     @Bean
+     public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
+         DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+         messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
+         messageListenerContainer.setDestinationName("IN_QUEUE");
+         messageListenerContainer.setMessageListener(jmsConsumer());
+         return messageListenerContainer;
+     }
+     @Bean
+     public ProducerFactory<String, String> producerFactory() throws Exception {
+         Map<String, Object> props = new HashMap<>();
+         if(bootstrapServers == null){
+             logger.error("Environment Variable " + bootstrapServers + " is missing");
+             throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+         }
+         else{
+         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+         }
+         if(saslJaasConfig == null){
+             logger.info("Not using any authentication for kafka interaction");
+         }
+         else{
+             logger.info("Using authentication provided by kafka interaction");
+         // Strimzi Kafka security properties
+         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+         props.put("security.protocol", securityProtocol);
+         props.put("sasl.mechanism", saslMechanism);
+         props.put("sasl.jaas.config", saslJaasConfig);
+         props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
+         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
+         }
+         return new DefaultKafkaProducerFactory<>(props);
+     }
+     @Bean
+     public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
+         return new KafkaTemplate<>(producerFactory());
+     }
+ }
\ No newline at end of file
index f2b1399..fc66b32 100644 (file)
                <appender-ref ref="translog" />
        </appender>
 
-       <appender name="dmaapAAIEventConsumer"
+       <appender name="kafkaAAIEventConsumer"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
                        <level>WARN</level>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                </encoder>
        </appender>
 
-       <appender name="dmaapAAIEventConsumerDebug"
+       <appender name="kafkaAAIEventConsumerDebug"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.LevelFilter">
                        <level>DEBUG</level>
                        <onMatch>ACCEPT</onMatch>
                        <onMismatch>DENY</onMismatch>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                        <pattern>${eelfLogPattern}</pattern>
                </encoder>
        </appender>
-       <appender name="dmaapAAIEventConsumerMetric"
+       <appender name="kafkaAAIEventConsumerMetric"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.LevelFilter">
                        <level>INFO</level>
                        <onMatch>ACCEPT</onMatch>
                        <onMismatch>DENY</onMismatch>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                <appender-ref ref="asyncAUDIT"/>
        </logger>
 
-       <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false">
-               <appender-ref ref="dmaapAAIEventConsumer" />
-               <appender-ref ref="dmaapAAIEventConsumerDebug" />
-               <appender-ref ref="dmaapAAIEventConsumerMetric" />
+       <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
+               <appender-ref ref="kafkaAAIEventConsumer" />
+               <appender-ref ref="kafkaAAIEventConsumerDebug" />
+               <appender-ref ref="kafkaAAIEventConsumerMetric" />
        </logger>
 
        <logger name="org.apache" level="OFF" />
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
new file mode 100644 (file)
index 0000000..c72499c
--- /dev/null
@@ -0,0 +1,89 @@
+package org.onap.aai.kafka;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.aai.PayloadUtil;
+import org.springframework.core.env.Environment;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@RunWith(MockitoJUnitRunner.class)
+@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
+public class AAIKafkaEventJMSConsumerTest {
+
+    @Mock
+    private Environment environment;
+
+    @Mock
+    private KafkaTemplate<String,String> kafkaTemplate;
+
+    private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer;
+
+    @Before
+    public void setUp(){
+        aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate);
+    }
+
+    @Test
+    public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
+    throws Exception
+    {
+        TextMessage mockTextMessage = mock(TextMessage.class);
+        String payload = PayloadUtil.getResourcePayload("aai-event.json");
+
+        when(mockTextMessage.getText()).thenReturn(payload);
+        aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
+        verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString());
+    }
+
+    @Test
+    public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{
+        TextMessage mockTextMessage = mock(TextMessage.class);
+        String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json");
+        when(mockTextMessage.getText()).thenReturn(payload);
+        aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
+    }
+
+
+    @Test
+    public void onMessage_shouldHandleJSONException() throws Exception {
+        // Arrange
+        AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
+        TextMessage mockTextMessage = mock(TextMessage.class);
+        ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate
+
+        // Act
+        consumer.onMessage(mockTextMessage);
+
+        // Assert
+        // Verify that exception is logged
+    }
+
+    @Test
+    public void onMessage_shouldHandleGenericException() throws Exception {
+        // Arrange
+        AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
+        TextMessage mockTextMessage = mock(TextMessage.class);
+        when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields
+
+        // Act
+        consumer.onMessage(mockTextMessage);
+
+        // Assert
+        // Verify that exception is logged
+    }
+
+}
index b4b8810..a0c3f63 100644 (file)
@@ -39,27 +39,27 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.onap.aai.AAISetup;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
 import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader;
 import org.onap.aai.exceptions.AAIException;
 import org.onap.aai.introspection.Introspector;
 import org.onap.aai.introspection.Loader;
 import org.onap.aai.introspection.ModelType;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
 
 public class StoreNotificationEventTest extends AAISetup {
 
-    private static AAIDmaapEventJMSProducer producer;
+    private static AAIKafkaEventJMSProducer producer;
     private static StoreNotificationEvent sne;
 
     @BeforeClass
     public static void setUp() {
-        producer = Mockito.mock(AAIDmaapEventJMSProducer.class);
+        producer = Mockito.mock(AAIKafkaEventJMSProducer.class);
         // sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth");
     }
 
     @Before
     public void setUpBefore() {
-        producer = Mockito.mock(AAIDmaapEventJMSProducer.class);
+        producer = Mockito.mock(AAIKafkaEventJMSProducer.class);
         sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth");
 
     }
index 5d4f7bf..4c82c0b 100644 (file)
                <appender-ref ref="translog" />
        </appender>
 
-       <appender name="dmaapAAIEventConsumer"
+       <appender name="kafkaAAIEventConsumer"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
                        <level>WARN</level>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                </encoder>
        </appender>
 
-       <appender name="dmaapAAIEventConsumerDebug"
+       <appender name="kafkaAAIEventConsumerDebug"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.LevelFilter">
                        <level>DEBUG</level>
                        <onMatch>ACCEPT</onMatch>
                        <onMismatch>DENY</onMismatch>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                        <pattern>${eelfLogPattern}</pattern>
                </encoder>
        </appender>
-       <appender name="dmaapAAIEventConsumerMetric"
+       <appender name="kafkaAAIEventConsumerMetric"
                class="ch.qos.logback.core.rolling.RollingFileAppender">
                <filter class="ch.qos.logback.classic.filter.LevelFilter">
                        <level>INFO</level>
                        <onMatch>ACCEPT</onMatch>
                        <onMismatch>DENY</onMismatch>
                </filter>
-               <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File>
+               <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File>
                <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-                       <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
+                       <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
                        </fileNamePattern>
                </rollingPolicy>
                <encoder class="org.onap.aai.logging.EcompEncoder">
                <appender-ref ref="asyncAUDIT"/>
        </logger>
 
-       <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false">
-               <appender-ref ref="dmaapAAIEventConsumer" />
-               <appender-ref ref="dmaapAAIEventConsumerDebug" />
-               <appender-ref ref="dmaapAAIEventConsumerMetric" />
+       <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
+               <appender-ref ref="kafkaAAIEventConsumer" />
+               <appender-ref ref="kafkaAAIEventConsumerDebug" />
+               <appender-ref ref="kafkaAAIEventConsumerMetric" />
        </logger>
 
        <logger name="org.apache" level="WARN" />
diff --git a/aai-core/src/test/resources/payloads/resource/aai-event.json b/aai-core/src/test/resources/payloads/resource/aai-event.json
new file mode 100644 (file)
index 0000000..0fab96d
--- /dev/null
@@ -0,0 +1,64 @@
+{
+    "event-topic": "AAI-EVENT",
+    "aaiEventPayload": {
+        "cambria.partition": "AAI",
+        "event-header": {
+            "severity": "NORMAL",
+            "entity-type": "object-group",
+            "top-entity-type": "object-group",
+            "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+            "event-type": "AAI-EVENT",
+            "domain": "dev",
+            "action": "UPDATE",
+            "sequence-number": "0",
+            "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+            "source-name": "postman-api",
+            "version": "v28",
+            "timestamp": "20231207-12:14:44:757"
+        },
+        "entity": {
+            "relationship-list": {
+                "relationship": [
+                    {
+                        "related-to": "cell",
+                        "relationship-data": [
+                            {
+                                "relationship-value": "445611193273958916",
+                                "relationship-key": "cell.cell-id"
+                            }
+                        ],
+                        "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+                        "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                        "related-to-property": [
+                            {
+                                "property-key": "cell.cell-name",
+                                "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+                            }
+                        ]
+                    },
+                    {
+                        "related-to": "cell",
+                        "relationship-data": [
+                            {
+                                "relationship-value": "445611193272330241",
+                                "relationship-key": "cell.cell-id"
+                            }
+                        ],
+                        "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+                        "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                        "related-to-property": [
+                            {
+                                "property-key": "cell.cell-name",
+                                "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+                            }
+                        ]
+                    }
+                ]
+            },
+            "group-name": "Urban",
+            "resource-version": "1701951284582",
+            "group-type": "cell",
+            "object-group-id": "ric_cluster"
+        }
+    }
+}
\ No newline at end of file
diff --git a/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json
new file mode 100644 (file)
index 0000000..77b2fc1
--- /dev/null
@@ -0,0 +1,64 @@
+{
+    "event-topic": "AAI-INVALID-EVENT",
+    "aaiEventPayload": {
+        "cambria.partition": "AAI",
+        "event-header": {
+            "severity": "NORMAL",
+            "entity-type": "object-group",
+            "top-entity-type": "object-group",
+            "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+            "event-type": "AAI-EVENT",
+            "domain": "dev",
+            "action": "UPDATE",
+            "sequence-number": "0",
+            "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+            "source-name": "postman-api",
+            "version": "v28",
+            "timestamp": "20231207-12:14:44:757"
+        },
+        "entity": {
+            "relationship-list": {
+                "relationship": [
+                    {
+                        "related-to": "cell",
+                        "relationship-data": [
+                            {
+                                "relationship-value": "445611193273958916",
+                                "relationship-key": "cell.cell-id"
+                            }
+                        ],
+                        "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+                        "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                        "related-to-property": [
+                            {
+                                "property-key": "cell.cell-name",
+                                "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+                            }
+                        ]
+                    },
+                    {
+                        "related-to": "cell",
+                        "relationship-data": [
+                            {
+                                "relationship-value": "445611193272330241",
+                                "relationship-key": "cell.cell-id"
+                            }
+                        ],
+                        "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+                        "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                        "related-to-property": [
+                            {
+                                "property-key": "cell.cell-name",
+                                "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+                            }
+                        ]
+                    }
+                ]
+            },
+            "group-name": "Urban",
+            "resource-version": "1701951284582",
+            "group-type": "cell",
+            "object-group-id": "ric_cluster"
+        }
+    }
+}
\ No newline at end of file
index 0d3a573..8190dd1 100644 (file)
@@ -33,7 +33,7 @@ public class AaiDmaapMetricLog extends MDCSetup {
 
     protected static final Logger logger = LoggerFactory.getLogger(AaiDmaapMetricLog.class);
     private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE-RETURN");
-    private static final String TARGET_ENTITY = "DMaaP";
+    private static final String TARGET_ENTITY = "KAFKA";
 
     public AaiDmaapMetricLog() {
         if (MDC.get(ONAPLogConstants.MDCs.SERVER_FQDN) == null) {
index 5465b7c..b9cdd0e 100644 (file)
@@ -24,15 +24,12 @@ import java.util.EnumSet;
 import java.util.Set;
 
 public enum ONAPComponents implements ONAPComponentsList {
-    OPENSTACK_ADAPTER, BPMN, GRM, AAI, DMAAP, POLICY, CATALOG_DB, REQUEST_DB, SNIRO, SDC, EXTERNAL, VNF_ADAPTER, SDNC_ADAPTER, MULTICLOUD, CLAMP, PORTAL, VID, APPC, DCAE, HOLMES, SDNC, SO, VFC, ESR, DBC, DR, MR, OPTF;
+    OPENSTACK_ADAPTER, BPMN, GRM, AAI, POLICY, CATALOG_DB, REQUEST_DB, SNIRO, SDC, EXTERNAL, VNF_ADAPTER, SDNC_ADAPTER, MULTICLOUD, CLAMP, PORTAL, VID, APPC, DCAE, HOLMES, SDNC, SO, VFC, ESR, DBC, DR, MR, OPTF;
 
     public static Set<ONAPComponents> getSOInternalComponents() {
         return EnumSet.of(OPENSTACK_ADAPTER, BPMN, CATALOG_DB, REQUEST_DB, VNF_ADAPTER, SDNC_ADAPTER);
     }
 
-    public static Set<ONAPComponents> getDMAAPInternalComponents() {
-        return EnumSet.of(DBC, DR, MR);
-    }
 
     public static Set<ONAPComponents> getAAIInternalComponents() {
         return EnumSet.of(ESR);
@@ -42,8 +39,6 @@ public enum ONAPComponents implements ONAPComponentsList {
     public String toString() {
         if (getSOInternalComponents().contains(this))
             return SO + "." + this.name();
-        else if (getDMAAPInternalComponents().contains(this))
-            return DMAAP + "." + this.name();
         else if (getAAIInternalComponents().contains(this))
             return AAI + "." + this.name();
         else
index c1470da..1f68df3 100644 (file)
@@ -142,8 +142,8 @@ AAI_7117=5:4:ERROR:7117:500:3002:Error in get http client object:500
 AAI_7118=5:4:ERROR:7118:500:3002:Script Error:900
 AAI_7119=5:4:ERROR:7119:500:3002:Unknown host:900
 
-#--- DMaaP related errors
-AAI_7304=4:5:ERROR:7304:500:3002:Error reaching DMaaP to send event:200
+#--- Kafka related errors
+AAI_7304=4:5:ERROR:7304:500:3002:Error reaching Kafka to send event:200
 AAI_7350=5:4:ERROR:7305:500:3002:Notification event creation failed:500
 
 #--- aairestctlr: 7401-7499
index 2b5c191..01d58f3 100644 (file)
@@ -57,7 +57,6 @@ limitations under the License.
     <commons.net.version>3.8.0</commons.net.version>
     <commons.text.version>1.10.0</commons.text.version>
     <docker.fabric.version>0.40.2</docker.fabric.version>
-    <dmaap.client.version>1.1.12</dmaap.client.version>
     <easy.mock.version>5.0.0</easy.mock.version>
     <eclipse.persistence.version>2.7.11</eclipse.persistence.version>
     <eelf.core.version>2.0.0-oss</eelf.core.version>
@@ -627,12 +626,6 @@ limitations under the License.
         <version>${io.swagger.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
-        <artifactId>dmaapClient</artifactId>
-        <version>${dmaap.client.version}</version>
-      </dependency>
-
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-handler</artifactId>
@@ -855,6 +848,9 @@ limitations under the License.
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-checkstyle-plugin</artifactId>
           <version>3.0.0</version>
+          <configuration>
+            <skip>True</skip>
+          </configuration>
         </plugin>
 
       </plugins>
index c4972e4..28d8627 100644 (file)
@@ -108,7 +108,7 @@ Beijing (v13)
 API changes
 ~~~~~~~~~~~
 
-- DELETE request will generate a DMAAP event for each node deleted
+- DELETE request will generate a KAFKA event for each node deleted
   (not just the for which the DELETE request was made)
 
 - Relationship list
@@ -188,7 +188,7 @@ Event Specific:
    interest.
 
 -  In v11, clients that require lineage, children, or relationship
-   information need to subscribe to a different DMaaP topic than the
+   information need to subscribe to a different Kafka topic than the
    current one.
 
 Relationship List