X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=aai-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fweb%2FKafkaConfig.java;fp=aai-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fweb%2FKafkaConfig.java;h=71ae5b6b108f6d6aad9754b3faabfaffa7cd4de7;hb=dff80ed76c8e0e6416e0688541f3094db3ca260a;hp=0000000000000000000000000000000000000000;hpb=dd7e9878066b0de0d8c0acddf58aec5702e83115;p=aai%2Faai-common.git diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java new file mode 100644 index 00000000..71ae5b6b --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -0,0 +1,175 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.web; + + import java.util.HashMap; + import java.util.Map; + + import javax.annotation.PostConstruct; + + import org.apache.activemq.ActiveMQConnectionFactory; + import org.apache.activemq.broker.BrokerService; + import org.apache.activemq.command.ActiveMQQueue; + import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.beans.factory.annotation.Value; + import org.springframework.context.ApplicationContext; + import org.springframework.context.annotation.Bean; + import org.springframework.context.annotation.Configuration; + import org.springframework.context.annotation.Profile; + import org.springframework.jms.connection.CachingConnectionFactory; + import org.springframework.jms.core.JmsTemplate; + import org.springframework.jms.listener.DefaultMessageListenerContainer; + import org.springframework.kafka.core.DefaultKafkaProducerFactory; + import org.springframework.kafka.core.KafkaTemplate; + import org.springframework.kafka.core.ProducerFactory; + + @Profile("kafka") + @Configuration + public class KafkaConfig { + + @Autowired + private ApplicationContext ctx; + + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private Integer retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @Bean(name = "connectionFactory") + public ActiveMQConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + public CachingConnectionFactory cachingConnectionFactory() { + return new CachingConnectionFactory(activeMQConnectionFactory()); + } + + @Bean(name = "destinationQueue") + public ActiveMQQueue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); + jmsTemplate.setDefaultDestination(activeMQQueue()); + + return jmsTemplate; + } + + @Bean + public AAIKafkaEventJMSProducer jmsProducer() { + return new AAIKafkaEventJMSProducer(); + } + + @Bean(name = "jmsConsumer") + public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { + return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(jmsConsumer()); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory producerFactory() throws Exception { + Map props = new HashMap<>(); + if(bootstrapServers == null){ + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } + else{ + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + if(saslJaasConfig == null){ + logger.info("Not using any authentication for kafka interaction"); + } + else{ + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate() throws Exception { + return new KafkaTemplate<>(producerFactory()); + } + } + \ No newline at end of file