2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.aai.web;
23 import java.util.HashMap;
26 import javax.annotation.PostConstruct;
28 import org.apache.activemq.ActiveMQConnectionFactory;
29 import org.apache.activemq.broker.BrokerService;
30 import org.apache.activemq.command.ActiveMQQueue;
31 import org.apache.kafka.clients.producer.ProducerConfig;
32 import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
33 import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Value;
38 import org.springframework.context.ApplicationContext;
39 import org.springframework.context.annotation.Bean;
40 import org.springframework.context.annotation.Configuration;
41 import org.springframework.context.annotation.Profile;
42 import org.springframework.jms.connection.CachingConnectionFactory;
43 import org.springframework.jms.core.JmsTemplate;
44 import org.springframework.jms.listener.DefaultMessageListenerContainer;
45 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
46 import org.springframework.kafka.core.KafkaTemplate;
47 import org.springframework.kafka.core.ProducerFactory;
51 public class KafkaConfig {
54 private ApplicationContext ctx;
57 @Value("${jms.bind.address}")
58 private String bindAddress;
60 @Value("${spring.kafka.producer.bootstrap-servers}")
61 private String bootstrapServers;
63 @Value("${spring.kafka.producer.properties.security.protocol}")
64 private String securityProtocol;
66 @Value("${spring.kafka.producer.properties.sasl.mechanism}")
67 private String saslMechanism;
69 @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
70 private String saslJaasConfig;
72 @Value("${spring.kafka.producer.retries}")
73 private Integer retries;
75 private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
79 System.setProperty("activemq.tcp.url", bindAddress);
82 @Bean(destroyMethod = "stop")
83 public BrokerService brokerService() throws Exception {
85 BrokerService broker = new BrokerService();
86 broker.addConnector(bindAddress);
87 broker.setPersistent(false);
88 broker.setUseJmx(false);
89 broker.setSchedulerSupport(false);
95 @Bean(name = "connectionFactory")
96 public ActiveMQConnectionFactory activeMQConnectionFactory() {
97 return new ActiveMQConnectionFactory(bindAddress);
101 public CachingConnectionFactory cachingConnectionFactory() {
102 return new CachingConnectionFactory(activeMQConnectionFactory());
105 @Bean(name = "destinationQueue")
106 public ActiveMQQueue activeMQQueue() {
107 return new ActiveMQQueue("IN_QUEUE");
111 public JmsTemplate jmsTemplate() {
112 JmsTemplate jmsTemplate = new JmsTemplate();
114 jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
115 jmsTemplate.setDefaultDestination(activeMQQueue());
121 public AAIKafkaEventJMSProducer jmsProducer() {
122 return new AAIKafkaEventJMSProducer();
125 @Bean(name = "jmsConsumer")
126 public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
127 return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
131 public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
133 DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
135 messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
136 messageListenerContainer.setDestinationName("IN_QUEUE");
137 messageListenerContainer.setMessageListener(jmsConsumer());
139 return messageListenerContainer;
143 public ProducerFactory<String, String> producerFactory() throws Exception {
144 Map<String, Object> props = new HashMap<>();
145 if(bootstrapServers == null){
146 logger.error("Environment Variable " + bootstrapServers + " is missing");
147 throw new Exception("Environment Variable " + bootstrapServers + " is missing");
150 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
152 if(saslJaasConfig == null){
153 logger.info("Not using any authentication for kafka interaction");
156 logger.info("Using authentication provided by kafka interaction");
157 // Strimzi Kafka security properties
158 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
159 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
160 props.put("security.protocol", securityProtocol);
161 props.put("sasl.mechanism", saslMechanism);
162 props.put("sasl.jaas.config", saslJaasConfig);
163 props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
164 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
167 return new DefaultKafkaProducerFactory<>(props);
171 public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
172 return new KafkaTemplate<>(producerFactory());