Remove DMaaP dependency from AAI-Common
[aai/aai-common.git] / aai-core / src / main / java / org / onap / aai / web / KafkaConfig.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *    http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21  package org.onap.aai.web;
22
23  import java.util.HashMap;
24  import java.util.Map;
25  
26  import javax.annotation.PostConstruct;
27  
28  import org.apache.activemq.ActiveMQConnectionFactory;
29  import org.apache.activemq.broker.BrokerService;
30  import org.apache.activemq.command.ActiveMQQueue;
31  import org.apache.kafka.clients.producer.ProducerConfig;
32 import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
33 import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
34 import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  import org.springframework.beans.factory.annotation.Autowired;
37  import org.springframework.beans.factory.annotation.Value;
38  import org.springframework.context.ApplicationContext;
39  import org.springframework.context.annotation.Bean;
40  import org.springframework.context.annotation.Configuration;
41  import org.springframework.context.annotation.Profile;
42  import org.springframework.jms.connection.CachingConnectionFactory;
43  import org.springframework.jms.core.JmsTemplate;
44  import org.springframework.jms.listener.DefaultMessageListenerContainer;
45  import org.springframework.kafka.core.DefaultKafkaProducerFactory;
46  import org.springframework.kafka.core.KafkaTemplate;
47  import org.springframework.kafka.core.ProducerFactory;
48  
49  @Profile("kafka")
50  @Configuration
51  public class KafkaConfig {
52  
53      @Autowired
54      private ApplicationContext ctx;
55  
56  
57      @Value("${jms.bind.address}")
58      private String bindAddress;
59  
60      @Value("${spring.kafka.producer.bootstrap-servers}")
61      private String bootstrapServers;
62  
63      @Value("${spring.kafka.producer.properties.security.protocol}")
64      private String securityProtocol;
65  
66      @Value("${spring.kafka.producer.properties.sasl.mechanism}")
67      private String saslMechanism;
68  
69      @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
70      private String saslJaasConfig;
71  
72      @Value("${spring.kafka.producer.retries}")
73      private Integer retries;
74  
75      private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
76  
77      @PostConstruct
78      public void init() {
79          System.setProperty("activemq.tcp.url", bindAddress);
80      }
81  
82      @Bean(destroyMethod = "stop")
83      public BrokerService brokerService() throws Exception {
84  
85          BrokerService broker = new BrokerService();
86          broker.addConnector(bindAddress);
87          broker.setPersistent(false);
88          broker.setUseJmx(false);
89          broker.setSchedulerSupport(false);
90          broker.start();
91  
92          return broker;
93      }
94  
95      @Bean(name = "connectionFactory")
96      public ActiveMQConnectionFactory activeMQConnectionFactory() {
97          return new ActiveMQConnectionFactory(bindAddress);
98      }
99  
100      @Bean
101      public CachingConnectionFactory cachingConnectionFactory() {
102          return new CachingConnectionFactory(activeMQConnectionFactory());
103      }
104  
105      @Bean(name = "destinationQueue")
106      public ActiveMQQueue activeMQQueue() {
107          return new ActiveMQQueue("IN_QUEUE");
108      }
109  
110      @Bean
111      public JmsTemplate jmsTemplate() {
112          JmsTemplate jmsTemplate = new JmsTemplate();
113  
114          jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
115          jmsTemplate.setDefaultDestination(activeMQQueue());
116  
117          return jmsTemplate;
118      }
119  
120      @Bean
121      public AAIKafkaEventJMSProducer jmsProducer() {
122          return new AAIKafkaEventJMSProducer();
123      }
124  
125      @Bean(name = "jmsConsumer")
126      public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
127          return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
128      }
129  
130      @Bean
131      public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
132  
133          DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
134  
135          messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
136          messageListenerContainer.setDestinationName("IN_QUEUE");
137          messageListenerContainer.setMessageListener(jmsConsumer());
138  
139          return messageListenerContainer;
140      }
141  
142      @Bean
143      public ProducerFactory<String, String> producerFactory() throws Exception {
144          Map<String, Object> props = new HashMap<>();
145          if(bootstrapServers == null){
146              logger.error("Environment Variable " + bootstrapServers + " is missing");
147              throw new Exception("Environment Variable " + bootstrapServers + " is missing");
148          }
149          else{
150          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
151          }
152          if(saslJaasConfig == null){
153              logger.info("Not using any authentication for kafka interaction");
154          }
155          else{
156              logger.info("Using authentication provided by kafka interaction");
157          // Strimzi Kafka security properties
158          props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
159          props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
160          props.put("security.protocol", securityProtocol);
161          props.put("sasl.mechanism", saslMechanism);
162          props.put("sasl.jaas.config", saslJaasConfig);
163          props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
164          props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
165          }
166  
167          return new DefaultKafkaProducerFactory<>(props);
168      }
169  
170      @Bean
171      public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
172          return new KafkaTemplate<>(producerFactory());
173      }
174  }
175