Merge "Add integration test for extending API: Get Module Definitions"
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / config / kafka / KafkaConfig.java
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2023 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.config.kafka;
22
23 import io.cloudevents.CloudEvent;
24 import java.time.Duration;
25 import java.util.Map;
26 import lombok.RequiredArgsConstructor;
27 import org.apache.kafka.clients.producer.ProducerConfig;
28 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
29 import org.springframework.context.annotation.Bean;
30 import org.springframework.context.annotation.Configuration;
31 import org.springframework.context.annotation.Primary;
32 import org.springframework.kafka.annotation.EnableKafka;
33 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34 import org.springframework.kafka.core.ConsumerFactory;
35 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
36 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
37 import org.springframework.kafka.core.KafkaTemplate;
38 import org.springframework.kafka.core.ProducerFactory;
39 import org.springframework.kafka.support.serializer.JsonDeserializer;
40 import org.springframework.kafka.support.serializer.JsonSerializer;
41
42 /**
43  * kafka Configuration for legacy and cloud events.
44  *
45  * @param <T> valid legacy event to be published over the wire.
46  */
47 @Configuration
48 @EnableKafka
49 @RequiredArgsConstructor
50 public class KafkaConfig<T> {
51
52     private final KafkaProperties kafkaProperties;
53
54     /**
55      * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
56      * application.yml and replaces value-serializer by JsonSerializer.
57      *
58      * @return legacy event producer instance.
59      */
60     @Bean
61     public ProducerFactory<String, T> legacyEventProducerFactory() {
62         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
63         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
64         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
65     }
66
67     /**
68      * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
69      * into application.yml and replaces deserializer-value by JsonDeserializer.
70      *
71      * @return an instance of legacy consumer factory.
72      */
73     @Bean
74     public ConsumerFactory<String, T> legacyEventConsumerFactory() {
75         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
76         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
77         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
78     }
79
80     /**
81      * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
82      *
83      * @return an instance of legacy Kafka template.
84      */
85     @Bean
86     @Primary
87     public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
88         final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
89         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
90         return kafkaTemplate;
91     }
92
93     /**
94      * A legacy concurrent kafka listener container factory.
95      *
96      * @return instance of Concurrent kafka listener factory
97      */
98     @Bean
99     public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
100         final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
101                 new ConcurrentKafkaListenerContainerFactory<>();
102         containerFactory.setConsumerFactory(legacyEventConsumerFactory());
103         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
104         return containerFactory;
105     }
106
107     /**
108      * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
109      * application.yml with CloudEventSerializer.
110      *
111      * @return cloud event producer instance.
112      */
113     @Bean
114     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
115         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
116         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
117     }
118
119     /**
120      * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
121      * into application.yml having CloudEventDeserializer as deserializer-value.
122      *
123      * @return an instance of cloud consumer factory.
124      */
125     @Bean
126     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
127         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
128         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
129     }
130
131
132     /**
133      * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
134      *
135      * @return an instance of cloud Kafka template.
136      */
137     @Bean
138     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
139         final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
140         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
141         return kafkaTemplate;
142     }
143
144     /**
145      * A Concurrent CloudEvent kafka listener container factory.
146      *
147      * @return instance of Concurrent kafka listener factory
148      */
149     @Bean
150     public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
151                                         cloudEventConcurrentKafkaListenerContainerFactory() {
152         final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
153                 new ConcurrentKafkaListenerContainerFactory<>();
154         containerFactory.setConsumerFactory(cloudEventConsumerFactory());
155         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
156         return containerFactory;
157     }
158
159 }