Remove DMaaP dependency from AAI-Common
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / web / KafkaConfig.java
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
new file mode 100644 (file)
index 0000000..71ae5b6
--- /dev/null
@@ -0,0 +1,175 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.web;
+
+ import java.util.HashMap;
+ import java.util.Map;
+ import javax.annotation.PostConstruct;
+ import org.apache.activemq.ActiveMQConnectionFactory;
+ import org.apache.activemq.broker.BrokerService;
+ import org.apache.activemq.command.ActiveMQQueue;
+ import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.beans.factory.annotation.Value;
+ import org.springframework.context.ApplicationContext;
+ import org.springframework.context.annotation.Bean;
+ import org.springframework.context.annotation.Configuration;
+ import org.springframework.context.annotation.Profile;
+ import org.springframework.jms.connection.CachingConnectionFactory;
+ import org.springframework.jms.core.JmsTemplate;
+ import org.springframework.jms.listener.DefaultMessageListenerContainer;
+ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+ import org.springframework.kafka.core.KafkaTemplate;
+ import org.springframework.kafka.core.ProducerFactory;
+ @Profile("kafka")
+ @Configuration
+ public class KafkaConfig {
+     @Autowired
+     private ApplicationContext ctx;
+     @Value("${jms.bind.address}")
+     private String bindAddress;
+     @Value("${spring.kafka.producer.bootstrap-servers}")
+     private String bootstrapServers;
+     @Value("${spring.kafka.producer.properties.security.protocol}")
+     private String securityProtocol;
+     @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+     private String saslMechanism;
+     @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+     private String saslJaasConfig;
+     @Value("${spring.kafka.producer.retries}")
+     private Integer retries;
+     private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+     @PostConstruct
+     public void init() {
+         System.setProperty("activemq.tcp.url", bindAddress);
+     }
+     @Bean(destroyMethod = "stop")
+     public BrokerService brokerService() throws Exception {
+         BrokerService broker = new BrokerService();
+         broker.addConnector(bindAddress);
+         broker.setPersistent(false);
+         broker.setUseJmx(false);
+         broker.setSchedulerSupport(false);
+         broker.start();
+         return broker;
+     }
+     @Bean(name = "connectionFactory")
+     public ActiveMQConnectionFactory activeMQConnectionFactory() {
+         return new ActiveMQConnectionFactory(bindAddress);
+     }
+     @Bean
+     public CachingConnectionFactory cachingConnectionFactory() {
+         return new CachingConnectionFactory(activeMQConnectionFactory());
+     }
+     @Bean(name = "destinationQueue")
+     public ActiveMQQueue activeMQQueue() {
+         return new ActiveMQQueue("IN_QUEUE");
+     }
+     @Bean
+     public JmsTemplate jmsTemplate() {
+         JmsTemplate jmsTemplate = new JmsTemplate();
+         jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
+         jmsTemplate.setDefaultDestination(activeMQQueue());
+         return jmsTemplate;
+     }
+     @Bean
+     public AAIKafkaEventJMSProducer jmsProducer() {
+         return new AAIKafkaEventJMSProducer();
+     }
+     @Bean(name = "jmsConsumer")
+     public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
+         return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
+     }
+     @Bean
+     public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
+         DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+         messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
+         messageListenerContainer.setDestinationName("IN_QUEUE");
+         messageListenerContainer.setMessageListener(jmsConsumer());
+         return messageListenerContainer;
+     }
+     @Bean
+     public ProducerFactory<String, String> producerFactory() throws Exception {
+         Map<String, Object> props = new HashMap<>();
+         if(bootstrapServers == null){
+             logger.error("Environment Variable " + bootstrapServers + " is missing");
+             throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+         }
+         else{
+         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+         }
+         if(saslJaasConfig == null){
+             logger.info("Not using any authentication for kafka interaction");
+         }
+         else{
+             logger.info("Using authentication provided by kafka interaction");
+         // Strimzi Kafka security properties
+         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+         props.put("security.protocol", securityProtocol);
+         props.put("sasl.mechanism", saslMechanism);
+         props.put("sasl.jaas.config", saslJaasConfig);
+         props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
+         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
+         }
+         return new DefaultKafkaProducerFactory<>(props);
+     }
+     @Bean
+     public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
+         return new KafkaTemplate<>(producerFactory());
+     }
+ }
\ No newline at end of file