Add integration test for extending API: Get Module Definitions
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / config / kafka / KafkaConfig.java
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2023 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.config.kafka;
22
23 import io.cloudevents.CloudEvent;
24 import java.util.Map;
25 import lombok.RequiredArgsConstructor;
26 import org.apache.kafka.clients.producer.ProducerConfig;
27 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
28 import org.springframework.context.annotation.Bean;
29 import org.springframework.context.annotation.Configuration;
30 import org.springframework.context.annotation.Primary;
31 import org.springframework.kafka.annotation.EnableKafka;
32 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
33 import org.springframework.kafka.core.ConsumerFactory;
34 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
35 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
36 import org.springframework.kafka.core.KafkaTemplate;
37 import org.springframework.kafka.core.ProducerFactory;
38 import org.springframework.kafka.support.serializer.JsonDeserializer;
39 import org.springframework.kafka.support.serializer.JsonSerializer;
40
41 /**
42  * kafka Configuration for legacy and cloud events.
43  *
44  * @param <T> valid legacy event to be published over the wire.
45  */
46 @Configuration
47 @EnableKafka
48 @RequiredArgsConstructor
49 public class KafkaConfig<T> {
50
51     private final KafkaProperties kafkaProperties;
52
53     /**
54      * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
55      * application.yml and replaces value-serializer by JsonSerializer.
56      *
57      * @return legacy event producer instance.
58      */
59     @Bean
60     public ProducerFactory<String, T> legacyEventProducerFactory() {
61         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
62         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
63         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
64     }
65
66     /**
67      * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
68      * into application.yml and replaces deserializer-value by JsonDeserializer.
69      *
70      * @return an instance of legacy consumer factory.
71      */
72     @Bean
73     public ConsumerFactory<String, T> legacyEventConsumerFactory() {
74         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
75         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
76         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
77     }
78
79     /**
80      * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
81      *
82      * @return an instance of legacy Kafka template.
83      */
84     @Bean
85     @Primary
86     public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
87         final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
88         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
89         return kafkaTemplate;
90     }
91
92     /**
93      * A legacy concurrent kafka listener container factory.
94      *
95      * @return instance of Concurrent kafka listener factory
96      */
97     @Bean
98     public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
99         final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
100                 new ConcurrentKafkaListenerContainerFactory<>();
101         containerFactory.setConsumerFactory(legacyEventConsumerFactory());
102         return containerFactory;
103     }
104
105     /**
106      * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
107      * application.yml with CloudEventSerializer.
108      *
109      * @return cloud event producer instance.
110      */
111     @Bean
112     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
113         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
114         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
115     }
116
117     /**
118      * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
119      * into application.yml having CloudEventDeserializer as deserializer-value.
120      *
121      * @return an instance of cloud consumer factory.
122      */
123     @Bean
124     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
125         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
126         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
127     }
128
129
130     /**
131      * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
132      *
133      * @return an instance of cloud Kafka template.
134      */
135     @Bean
136     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
137         final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
138         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
139         return kafkaTemplate;
140     }
141
142     /**
143      * A Concurrent CloudEvent kafka listener container factory.
144      *
145      * @return instance of Concurrent kafka listener factory
146      */
147     @Bean
148     public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
149                                         cloudEventConcurrentKafkaListenerContainerFactory() {
150         final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
151                 new ConcurrentKafkaListenerContainerFactory<>();
152         containerFactory.setConsumerFactory(cloudEventConsumerFactory());
153         return containerFactory;
154     }
155
156 }