Make JMS-based messaging compatible with tracing 21/138521/5
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Mon, 22 Jul 2024 12:21:26 +0000 (14:21 +0200)
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>
Tue, 23 Jul 2024 07:44:23 +0000 (09:44 +0200)
- use dependency injection instead of new Foo() for jms related classes
- inject interfaces and not their implementations
- add integration test that asserts message sending via JMS to Kafka [1]

[1] this also prepares removal of ActiveMQ as a middleman

Issue-ID: AAI-3932
Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
13 files changed:
aai-core/pom.xml
aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
aai-core/src/main/resources/logback.xml
aai-core/src/test/java/org/onap/aai/AAISetup.java
aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java [new file with mode: 0644]
aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java [deleted file]
aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java [new file with mode: 0644]
aai-core/src/test/resources/logback.xml
aai-core/src/test/resources/payloads/expected/aai-event.json [new file with mode: 0644]

index bd759f1..c091546 100644 (file)
@@ -387,6 +387,11 @@ limitations under the License.
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-jms</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>javax.jms</groupId>
+                       <artifactId>javax.jms-api</artifactId>
+                       <version>2.0.1</version>
+               </dependency>
                <dependency>
                        <groupId>com.fasterxml.jackson.core</groupId>
                        <artifactId>jackson-core</artifactId>
index 731f3df..67f6842 100644 (file)
  * ============LICENSE_END=========================================================
  */
 
- package org.onap.aai.kafka;
-
- import java.util.Map;
- import java.util.Objects;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
- import org.json.JSONException;
- import org.json.JSONObject;
- import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
- import org.onap.aai.exceptions.AAIException;
- import org.onap.aai.logging.AaiElsErrorCode;
- import org.onap.aai.logging.ErrorLogHelper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.core.env.Environment;
- import org.springframework.kafka.core.KafkaTemplate;
- public class AAIKafkaEventJMSConsumer implements MessageListener {
-     private static final String EVENT_TOPIC = "event-topic";
-     private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
-     private Environment environment;
-     private Map<String, String> mdcCopy;
-     private KafkaTemplate<String,String> kafkaTemplate;
-     public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
-         super();
-         mdcCopy = MDC.getCopyOfContextMap();
-         Objects.nonNull(environment);
-         this.environment = environment;
-         this.kafkaTemplate=kafkaTemplate;
-     }
-     @Override
-     public void onMessage(Message message) {
-         if (kafkaTemplate == null) {
-             return;
-         }
-         String jsmMessageTxt = "";
-         String aaiEvent = "";
-         JSONObject aaiEventHeader;
-         JSONObject joPayload;
-         String transactionId = "";
-         String serviceName = "";
-         String eventName = "";
-         String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
-         String errorDescription = "";
-         if (mdcCopy != null) {
-             MDC.setContextMap(mdcCopy);
-         }
-         if (message instanceof TextMessage) {
-             AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
-             try {
-                 jsmMessageTxt = ((TextMessage) message).getText();
-                 JSONObject jo = new JSONObject(jsmMessageTxt);
-                 if (jo.has("aaiEventPayload")) {
-                     joPayload = jo.getJSONObject("aaiEventPayload");
-                     aaiEvent = joPayload.toString();
-                 } else {
-                     return;
-                 }
-                 if (jo.getString(EVENT_TOPIC) != null) {
-                     eventName = jo.getString(EVENT_TOPIC);
-                 }
-                 if (joPayload.has("event-header")) {
-                     try {
-                         aaiEventHeader = joPayload.getJSONObject("event-header");
-                         if (aaiEventHeader.has("id")) {
-                             transactionId = aaiEventHeader.get("id").toString();
-                         }
-                         if (aaiEventHeader.has("entity-link")) {
-                             serviceName = aaiEventHeader.get("entity-link").toString();
-                         }
-                     } catch (JSONException jexc) {
-                         // ignore, this is just used for logging
-                     }
-                 }
-                 metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-                 if ("AAI-EVENT".equals(eventName)) {
-                     // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
-                     kafkaTemplate.send(eventName,aaiEvent);
-                 } else {
-                     LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
-                 }
-             } catch (JMSException | JSONException e) {
-                 aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
-                 errorDescription = e.getMessage();
-                 ErrorLogHelper.logException(new AAIException("AAI_7350"));
-             } catch (Exception e) {
-                 aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
-                 errorDescription = e.getMessage();
-                 ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
-             } finally {
-                 metricLog.post(aaiElsErrorCode, errorDescription);
-             }
-         }
-     }
- }
\ No newline at end of file
+package org.onap.aai.kafka;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+import org.onap.aai.exceptions.AAIException;
+import org.onap.aai.logging.AaiElsErrorCode;
+import org.onap.aai.logging.ErrorLogHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.kafka.core.KafkaTemplate;
+
+
+public class AAIKafkaEventJMSConsumer implements MessageListener {
+
+    private static final String EVENT_TOPIC = "event-topic";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+
+    private Map<String, String> mdcCopy;
+    private final KafkaTemplate<String, String> kafkaTemplate;
+
+    public AAIKafkaEventJMSConsumer(KafkaTemplate<String, String> kafkaTemplate) {
+        super();
+        mdcCopy = MDC.getCopyOfContextMap();
+        this.kafkaTemplate = kafkaTemplate;
+    }
+
+    @Override
+    public void onMessage(Message message) {
+
+        if (kafkaTemplate == null) {
+            return;
+        }
+
+        String jmsMessageText = "";
+        String aaiEvent = "";
+        JSONObject aaiEventHeader;
+        JSONObject aaiEventPayload;
+        String transactionId = "";
+        String serviceName = "";
+        String topicName = "";
+        String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+        String errorDescription = "";
+
+        if (mdcCopy != null) {
+            MDC.setContextMap(mdcCopy);
+        }
+
+        if (message instanceof TextMessage) {
+            AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+            try {
+                jmsMessageText = ((TextMessage) message).getText();
+                JSONObject jsonObject = new JSONObject(jmsMessageText);
+                if (jsonObject.has("aaiEventPayload")) {
+                    aaiEventPayload = jsonObject.getJSONObject("aaiEventPayload");
+                    aaiEvent = aaiEventPayload.toString();
+                } else {
+                    return;
+                }
+                if (jsonObject.getString(EVENT_TOPIC) != null) {
+                    topicName = jsonObject.getString(EVENT_TOPIC);
+                }
+                if (aaiEventPayload.has("event-header")) {
+                    try {
+                        aaiEventHeader = aaiEventPayload.getJSONObject("event-header");
+                        if (aaiEventHeader.has("id")) {
+                            transactionId = aaiEventHeader.get("id").toString();
+                        }
+                        if (aaiEventHeader.has("entity-link")) {
+                            serviceName = aaiEventHeader.get("entity-link").toString();
+                        }
+                    } catch (JSONException jexc) {
+                        // ignore, this is just used for logging
+                    }
+                }
+                metricLog.pre(topicName, aaiEvent, transactionId, serviceName);
+
+                if ("AAI-EVENT".equals(topicName)) {
+
+                    kafkaTemplate.send(topicName, aaiEvent);
+
+                } else {
+                    LOGGER.error(String.format("%s|Event Topic invalid.", topicName));
+                }
+            } catch (JMSException | JSONException e) {
+                aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+                errorDescription = e.getMessage();
+                ErrorLogHelper.logException(new AAIException("AAI_7350"));
+            } catch (Exception e) {
+                e.printStackTrace();
+                // LOGGER.error();
+                LOGGER.error(e.getMessage());
+                aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+                errorDescription = e.getMessage();
+                String errorMessage = String.format("Error processing message: %s, message payload: %s", e.getMessage(), jmsMessageText);
+                ErrorLogHelper.logException(new AAIException("AAI_7304", errorMessage));
+            } finally {
+                metricLog.post(aaiElsErrorCode, errorDescription);
+            }
+        }
+    }
+}
index 00cf677..a46f2e9 100644 (file)
 
 package org.onap.aai.kafka;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.json.JSONObject;
 import org.onap.aai.util.AAIConfig;
-import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.stereotype.Service;
 
+import lombok.RequiredArgsConstructor;
+
+@Service
+@RequiredArgsConstructor
 public class AAIKafkaEventJMSProducer implements MessageProducer {
 
-    private JmsTemplate jmsTemplate;
+    @Value("${aai.events.enabled:true}") private boolean eventsEnabled;
+    private final JmsTemplate jmsTemplate;
 
-    public AAIKafkaEventJMSProducer() {
-        if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
-            this.jmsTemplate = new JmsTemplate();
-            String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
-            this.jmsTemplate
-                    .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl)));
-            this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE"));
+    public void sendMessageToDefaultDestination(String msg) {
+        if (eventsEnabled) {
+            jmsTemplate.convertAndSend(msg);
         }
     }
 
     public void sendMessageToDefaultDestination(JSONObject finalJson) {
-        if (jmsTemplate != null) {
-            jmsTemplate.convertAndSend(finalJson.toString());
-            CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
-            if (ccf != null) {
-                ccf.destroy();
-            }
-        }
-    }
-
-    public void sendMessageToDefaultDestination(String msg) {
-        if (jmsTemplate != null) {
-            jmsTemplate.convertAndSend(msg);
-            CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
-            if (ccf != null) {
-                ccf.destroy();
-            }
-        }
+        sendMessageToDefaultDestination(finalJson.toString());
     }
 }
index 6f3e888..127cf53 100644 (file)
@@ -39,14 +39,18 @@ import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
 import org.onap.aai.kafka.MessageProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
 import org.springframework.core.env.Environment;
+import org.springframework.jms.core.JmsTemplate;
 
 public class StoreNotificationEvent {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(StoreNotificationEvent.class);
 
-    private MessageProducer messageProducer;
+    @Autowired JmsTemplate jmsTemplate;
+
+    private final MessageProducer messageProducer;
     private String fromAppId = "";
     private String transId = "";
     private final String transactionId;
@@ -59,12 +63,12 @@ public class StoreNotificationEvent {
      * Instantiates a new store notification event.
      */
     public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
-        this.messageProducer = new AAIKafkaEventJMSProducer();
+        this.messageProducer = new AAIKafkaEventJMSProducer(jmsTemplate);
         this.transactionId = transactionId;
         this.sourceOfTruth = sourceOfTruth;
     }
 
-    public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+    public StoreNotificationEvent(MessageProducer producer, String transactionId, String sourceOfTruth) {
         this.messageProducer = producer;
         this.transactionId = transactionId;
         this.sourceOfTruth = sourceOfTruth;
index 6fbc297..b2821b0 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.aai.util.delta;
 
-import com.google.gson.*;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
@@ -28,38 +27,35 @@ import java.util.Date;
 import java.util.Map;
 
 import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
 import org.onap.aai.kafka.MessageProducer;
 import org.onap.aai.util.AAIConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 
-public class DeltaEvents {
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(DeltaEvents.class);
+public class DeltaEvents {
 
     private static final Gson gson =
             new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES).create();
+    private static final String eventVersion = "v1";
 
-    private String transId;
-    private String sourceName;
-    private String eventVersion = "v1";
-    private String schemaVersion;
-    private Map<String, ObjectDelta> objectDeltas;
+    private final String transId;
+    private final String sourceName;
+    private final String schemaVersion;
+    private final Map<String, ObjectDelta> objectDeltas;
 
-    private MessageProducer messageProducer;
+    @Autowired private MessageProducer messageProducer;
 
     public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
-        this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
-    }
-
-    public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
-            MessageProducer messageProducer) {
-        this.transId = transId;
-        this.sourceName = sourceName;
-        this.schemaVersion = schemaVersion;
-        this.objectDeltas = objectDeltas;
-        this.messageProducer = messageProducer;
+    this.transId = transId;
+    this.sourceName = sourceName;
+    this.schemaVersion = schemaVersion;
+    this.objectDeltas = objectDeltas;
     }
 
     public boolean triggerEvents() {
@@ -98,7 +94,7 @@ public class DeltaEvents {
         header.addProperty("source-name", this.sourceName);
         header.addProperty("domain", this.getDomain());
         header.addProperty("event-type", this.getEventType());
-        header.addProperty("event-version", this.eventVersion);
+        header.addProperty("event-version", eventVersion);
         header.addProperty("schema-version", this.schemaVersion);
         header.addProperty("action", first.getAction().toString());
         header.addProperty("entity-type", this.getEntityType(first));
@@ -126,7 +122,7 @@ public class DeltaEvents {
 
     /**
      * Given Long timestamp convert to format YYYYMMdd-HH:mm:ss:SSS
-     * 
+     *
      * @param timestamp milliseconds since epoc
      * @return long timestamp in format YYYYMMdd-HH:mm:ss:SSS
      */
index 71ae5b6..05e4768 100644 (file)
  * ============LICENSE_END=========================================================
  */
 
- package org.onap.aai.web;
-
- import java.util.HashMap;
- import java.util.Map;
- import javax.annotation.PostConstruct;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.broker.BrokerService;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.kafka.clients.producer.ProducerConfig;
+package org.onap.aai.web;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.introspection.LoaderFactory;
 import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
 import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
+import org.onap.aai.rest.notification.NotificationService;
 import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Profile;
- import org.springframework.jms.connection.CachingConnectionFactory;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.listener.DefaultMessageListenerContainer;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- @Profile("kafka")
- @Configuration
- public class KafkaConfig {
-     @Autowired
-     private ApplicationContext ctx;
-     @Value("${jms.bind.address}")
-     private String bindAddress;
-     @Value("${spring.kafka.producer.bootstrap-servers}")
-     private String bootstrapServers;
-     @Value("${spring.kafka.producer.properties.security.protocol}")
-     private String securityProtocol;
-     @Value("${spring.kafka.producer.properties.sasl.mechanism}")
-     private String saslMechanism;
-     @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
-     private String saslJaasConfig;
-     @Value("${spring.kafka.producer.retries}")
-     private Integer retries;
-     private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
-     @PostConstruct
-     public void init() {
-         System.setProperty("activemq.tcp.url", bindAddress);
-     }
-     @Bean(destroyMethod = "stop")
-     public BrokerService brokerService() throws Exception {
-         BrokerService broker = new BrokerService();
-         broker.addConnector(bindAddress);
-         broker.setPersistent(false);
-         broker.setUseJmx(false);
-         broker.setSchedulerSupport(false);
-         broker.start();
-         return broker;
-     }
-     @Bean(name = "connectionFactory")
-     public ActiveMQConnectionFactory activeMQConnectionFactory() {
-         return new ActiveMQConnectionFactory(bindAddress);
-     }
-     @Bean
-     public CachingConnectionFactory cachingConnectionFactory() {
-         return new CachingConnectionFactory(activeMQConnectionFactory());
-     }
-     @Bean(name = "destinationQueue")
-     public ActiveMQQueue activeMQQueue() {
-         return new ActiveMQQueue("IN_QUEUE");
-     }
-     @Bean
-     public JmsTemplate jmsTemplate() {
-         JmsTemplate jmsTemplate = new JmsTemplate();
-         jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
-         jmsTemplate.setDefaultDestination(activeMQQueue());
-         return jmsTemplate;
-     }
-     @Bean
-     public AAIKafkaEventJMSProducer jmsProducer() {
-         return new AAIKafkaEventJMSProducer();
-     }
-     @Bean(name = "jmsConsumer")
-     public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
-         return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
-     }
-     @Bean
-     public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-         DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-         messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
-         messageListenerContainer.setDestinationName("IN_QUEUE");
-         messageListenerContainer.setMessageListener(jmsConsumer());
-         return messageListenerContainer;
-     }
-     @Bean
-     public ProducerFactory<String, String> producerFactory() throws Exception {
-         Map<String, Object> props = new HashMap<>();
-         if(bootstrapServers == null){
-             logger.error("Environment Variable " + bootstrapServers + " is missing");
-             throw new Exception("Environment Variable " + bootstrapServers + " is missing");
-         }
-         else{
-         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-         }
-         if(saslJaasConfig == null){
-             logger.info("Not using any authentication for kafka interaction");
-         }
-         else{
-             logger.info("Using authentication provided by kafka interaction");
-         // Strimzi Kafka security properties
-         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-         props.put("security.protocol", securityProtocol);
-         props.put("sasl.mechanism", saslMechanism);
-         props.put("sasl.jaas.config", saslJaasConfig);
-         props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
-         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
-         }
-         return new DefaultKafkaProducerFactory<>(props);
-     }
-     @Bean
-     public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
-         return new KafkaTemplate<>(producerFactory());
-     }
- }
\ No newline at end of file
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+@Profile("kafka")
+@Configuration
+public class KafkaConfig {
+
+    @Value("${jms.bind.address}")
+    private String bindAddress;
+
+    @Value("${spring.kafka.producer.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.producer.properties.security.protocol}")
+    private String securityProtocol;
+
+    @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+    private String saslMechanism;
+
+    @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+    private String saslJaasConfig;
+
+    @Value("${spring.kafka.producer.retries}")
+    private String retries;
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+
+    @PostConstruct
+    public void init() {
+        System.setProperty("activemq.tcp.url", bindAddress);
+    }
+
+    @Bean(destroyMethod = "stop")
+    public BrokerService brokerService() throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.addConnector(bindAddress);
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setSchedulerSupport(false);
+        broker.start();
+
+        return broker;
+    }
+
+    @ConditionalOnMissingBean
+    @Bean(name = "connectionFactory")
+    public ConnectionFactory activeMQConnectionFactory() {
+        return new ActiveMQConnectionFactory(bindAddress);
+    }
+
+    @Bean
+    @ConditionalOnMissingBean
+    public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
+        return new CachingConnectionFactory(targetConnectionFactory);
+    }
+
+    @Bean(name = "destinationQueue")
+    public Queue activeMQQueue() {
+        return new ActiveMQQueue("IN_QUEUE");
+    }
+
+    @Bean
+    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, Queue queue) {
+        JmsTemplate jmsTemplate = new JmsTemplate();
+
+        jmsTemplate.setConnectionFactory(connectionFactory);
+        jmsTemplate.setDefaultDestination(queue);
+
+        return jmsTemplate;
+    }
+
+    @Bean(name = "jmsConsumer")
+    public MessageListener jmsConsumer(KafkaTemplate<String, String> kafkaTemplate) throws Exception {
+        return new AAIKafkaEventJMSConsumer(kafkaTemplate);
+    }
+
+    @Bean
+    public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener)
+            throws Exception {
+
+        DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+
+        messageListenerContainer.setConnectionFactory(connectionFactory);
+        messageListenerContainer.setDestinationName("IN_QUEUE");
+        messageListenerContainer.setMessageListener(messageListener);
+
+        return messageListenerContainer;
+    }
+
+    @Bean
+    public ProducerFactory<String, String> producerFactory() throws Exception {
+        Map<String, Object> props = new HashMap<>();
+        if (bootstrapServers == null) {
+            logger.error("Environment Variable " + bootstrapServers + " is missing");
+            throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+        } else {
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        }
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.RETRIES_CONFIG, retries);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+
+        if (saslJaasConfig == null) {
+            logger.info("Not using any authentication for kafka interaction");
+        } else {
+            logger.info("Using authentication provided by kafka interaction");
+            // Strimzi Kafka security properties
+            props.put("security.protocol", securityProtocol);
+            props.put("sasl.mechanism", saslMechanism);
+            props.put("sasl.jaas.config", saslJaasConfig);
+        }
+
+        return new DefaultKafkaProducerFactory<>(props);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) throws Exception {
+        return new KafkaTemplate<>(producerFactory);
+    }
+
+    @Bean
+    public MessageProducer messageProducer(JmsTemplate jmsTemplate) {
+        return new AAIKafkaEventJMSProducer(jmsTemplate);
+    }
+
+    @Bean
+    public NotificationService notificationService(LoaderFactory loaderFactory,
+    @Value("${schema.uri.base.path}") String basePath,
+    @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) {
+        return new NotificationService(loaderFactory, basePath, isDeltaEventsEnabled);
+    }
+}
index fc66b32..ba5b3de 100644 (file)
                        <pattern>${eelfTransLogPattern}</pattern>
                </encoder>
        </appender>
-       
+
        <appender name="asynctranslog" class="ch.qos.logback.classic.AsyncAppender">
                <queueSize>1000</queueSize>
                <includeCallerData>true</includeCallerData>
        <logger name="ajsc.UserDefinedBeansDefService" level="WARN" />
        <logger name="ajsc.LoggingConfigurationService" level="WARN" />
 
-       <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet 
+       <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
                logging) -->
        <logger name="org.codehaus.groovy" level="WARN" />
        <logger name="com.att.scamper" level="WARN" />
        <logger name="org.apache.coyote" level="WARN" />
        <logger name="org.apache.jasper" level="WARN" />
 
-       <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging. 
+       <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
                May aid in troubleshooting) -->
        <logger name="org.apache.camel" level="WARN" />
        <logger name="org.apache.cxf" level="WARN" />
        </logger>
 
        <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
-               <appender-ref ref="kafkaAAIEventConsumer" />
-               <appender-ref ref="kafkaAAIEventConsumerDebug" />
-               <appender-ref ref="kafkaAAIEventConsumerMetric" />
+               <appender-ref ref="STDOUT" />
        </logger>
 
        <logger name="org.apache" level="OFF" />
index 16f21ff..08a0e91 100644 (file)
@@ -40,6 +40,7 @@ import org.onap.aai.setup.AAIConfigTranslator;
 import org.onap.aai.setup.SchemaVersion;
 import org.onap.aai.setup.SchemaVersions;
 import org.onap.aai.util.AAIConstants;
+import org.onap.aai.web.KafkaConfig;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.test.context.ContextConfiguration;
@@ -52,7 +53,8 @@ import org.springframework.test.context.web.WebAppConfiguration;
 @ContextConfiguration(
         classes = {ConfigConfiguration.class, AAIConfigTranslator.class, EdgeIngestor.class, EdgeSerializer.class,
                 NodeIngestor.class, SpringContextAware.class, IntrospectionConfig.class, RestBeanConfig.class,
-                XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, LoaderFactory.class, NotificationService.class})
+                XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class,
+                KafkaConfig.class, LoaderFactory.class, NotificationService.class})
 @TestPropertySource(
         properties = {"schema.uri.base.path = /aai", "schema.xsd.maxoccurs = 5000", "schema.translator.list=config",
                 "schema.nodes.location=src/test/resources/onap/oxm",
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
new file mode 100644 (file)
index 0000000..c10260d
--- /dev/null
@@ -0,0 +1,97 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.json.JSONObject;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.onap.aai.AAISetup;
+import org.onap.aai.PayloadUtil;
+import org.onap.aai.restcore.HttpMethod;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@ActiveProfiles("kafka")
+@Import(KafkaTestConfiguration.class)
+@EmbeddedKafka(partitions = 1, topics = { "AAI-EVENT" })
+@TestPropertySource(
+        properties = {
+          "jms.bind.address=tcp://localhost:61647",
+          "aai.events.enabled=true",
+          "spring.kafka.producer.retries=0",
+          "spring.kafka.producer.properties.sasl.jaas.config=#{null}",
+          "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}"
+        })
+public class AAIKafkaEventIntegrationTest extends AAISetup {
+
+    @Mock
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @Autowired
+    MessageProducer messageProducer;
+
+    @Autowired
+    private ConsumerFactory<String, String> consumerFactory;
+
+    @Test
+    public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
+            throws Exception {
+        Consumer<String, String> consumer = consumerFactory.createConsumer();
+
+        consumer.subscribe(Collections.singletonList("AAI-EVENT"));
+
+        String payload = PayloadUtil.getResourcePayload("aai-event.json");
+        String expectedResponse = PayloadUtil.getExpectedPayload("aai-event.json");
+        messageProducer.sendMessageToDefaultDestination(payload);
+
+        ConsumerRecords<String, String> consumerRecords = KafkaTestUtils.getRecords(consumer, 10000);
+        assertFalse(consumerRecords.isEmpty());
+        consumerRecords.forEach(consumerRecord -> {
+            JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.NON_EXTENSIBLE);
+        });
+    }
+
+}
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
deleted file mode 100644 (file)
index c72499c..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.onap.aai.kafka;
-
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import javax.jms.TextMessage;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.aai.PayloadUtil;
-import org.springframework.core.env.Environment;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.util.ReflectionTestUtils;
-
-@RunWith(MockitoJUnitRunner.class)
-@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
-public class AAIKafkaEventJMSConsumerTest {
-
-    @Mock
-    private Environment environment;
-
-    @Mock
-    private KafkaTemplate<String,String> kafkaTemplate;
-
-    private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer;
-
-    @Before
-    public void setUp(){
-        aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate);
-    }
-
-    @Test
-    public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
-    throws Exception
-    {
-        TextMessage mockTextMessage = mock(TextMessage.class);
-        String payload = PayloadUtil.getResourcePayload("aai-event.json");
-
-        when(mockTextMessage.getText()).thenReturn(payload);
-        aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
-        verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString());
-    }
-
-    @Test
-    public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{
-        TextMessage mockTextMessage = mock(TextMessage.class);
-        String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json");
-        when(mockTextMessage.getText()).thenReturn(payload);
-        aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
-    }
-
-
-    @Test
-    public void onMessage_shouldHandleJSONException() throws Exception {
-        // Arrange
-        AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
-        TextMessage mockTextMessage = mock(TextMessage.class);
-        ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate
-
-        // Act
-        consumer.onMessage(mockTextMessage);
-
-        // Assert
-        // Verify that exception is logged
-    }
-
-    @Test
-    public void onMessage_shouldHandleGenericException() throws Exception {
-        // Arrange
-        AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
-        TextMessage mockTextMessage = mock(TextMessage.class);
-        when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields
-
-        // Act
-        consumer.onMessage(mockTextMessage);
-
-        // Assert
-        // Verify that exception is logged
-    }
-
-}
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java
new file mode 100644 (file)
index 0000000..730699e
--- /dev/null
@@ -0,0 +1,77 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.KafkaAdmin;
+import org.springframework.test.context.TestPropertySource;
+
+@TestConfiguration
+public class KafkaTestConfiguration {
+
+  @Value("${spring.embedded.kafka.brokers}") private String bootstrapAddress;
+
+  private String groupId = "test-consumer";
+
+  @Bean
+  public KafkaAdmin kafkaAdmin() {
+    Map<String, Object> configs = new HashMap<>();
+    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+    return new KafkaAdmin(configs);
+  }
+
+  @Bean
+  public ConsumerFactory<String, String> consumerFactory() {
+    Map<String, Object> props = new HashMap<>();
+    props.put(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        bootstrapAddress);
+    props.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        groupId);
+    props.put(
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class);
+    props.put(
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    return new DefaultKafkaConsumerFactory<>(props);
+  }
+
+  @Bean
+  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
+
+    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+    factory.setConsumerFactory(consumerFactory);
+    return factory;
+  }
+}
index 4c82c0b..6acc77f 100644 (file)
                        <pattern>${eelfTransLogPattern}</pattern>
                </encoder>
        </appender>
-       
+
        <appender name="asynctranslog" class="ch.qos.logback.classic.AsyncAppender">
                <queueSize>1000</queueSize>
                <includeCallerData>true</includeCallerData>
        <logger name="ajsc.UserDefinedBeansDefService" level="WARN" />
        <logger name="ajsc.LoggingConfigurationService" level="WARN" />
 
-       <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet 
+       <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
                logging) -->
        <logger name="org.codehaus.groovy" level="WARN" />
        <logger name="com.att.scamper" level="WARN" />
        <logger name="org.apache.coyote" level="WARN" />
        <logger name="org.apache.jasper" level="WARN" />
 
-       <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging. 
+       <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
                May aid in troubleshooting) -->
        <logger name="org.apache.camel" level="WARN" />
        <logger name="org.apache.cxf" level="WARN" />
        </logger>
 
        <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
-               <appender-ref ref="kafkaAAIEventConsumer" />
-               <appender-ref ref="kafkaAAIEventConsumerDebug" />
-               <appender-ref ref="kafkaAAIEventConsumerMetric" />
+               <appender-ref ref="STDOUT" />
        </logger>
 
        <logger name="org.apache" level="WARN" />
diff --git a/aai-core/src/test/resources/payloads/expected/aai-event.json b/aai-core/src/test/resources/payloads/expected/aai-event.json
new file mode 100644 (file)
index 0000000..86c6799
--- /dev/null
@@ -0,0 +1,61 @@
+{
+    "cambria.partition": "AAI",
+    "event-header": {
+        "severity": "NORMAL",
+        "entity-type": "object-group",
+        "top-entity-type": "object-group",
+        "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+        "event-type": "AAI-EVENT",
+        "domain": "dev",
+        "action": "UPDATE",
+        "sequence-number": "0",
+        "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+        "source-name": "postman-api",
+        "version": "v28",
+        "timestamp": "20231207-12:14:44:757"
+    },
+    "entity": {
+        "relationship-list": {
+            "relationship": [
+                {
+                    "related-to": "cell",
+                    "relationship-data": [
+                        {
+                            "relationship-value": "445611193273958916",
+                            "relationship-key": "cell.cell-id"
+                        }
+                    ],
+                    "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+                    "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                    "related-to-property": [
+                        {
+                            "property-key": "cell.cell-name",
+                            "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+                        }
+                    ]
+                },
+                {
+                    "related-to": "cell",
+                    "relationship-data": [
+                        {
+                            "relationship-value": "445611193272330241",
+                            "relationship-key": "cell.cell-id"
+                        }
+                    ],
+                    "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+                    "relationship-label": "org.onap.relationships.inventory.MemberOf",
+                    "related-to-property": [
+                        {
+                            "property-key": "cell.cell-name",
+                            "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+                        }
+                    ]
+                }
+            ]
+        },
+        "group-name": "Urban",
+        "resource-version": "1701951284582",
+        "group-type": "cell",
+        "object-group-id": "ric_cluster"
+    }
+}