167df5a98dd1933689a91752ac809fb3a6a65e95
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / config / kafka / KafkaConfig.java
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2023-2024 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.config.kafka;
22
23 import io.cloudevents.CloudEvent;
24 import java.time.Duration;
25 import java.util.Map;
26 import lombok.RequiredArgsConstructor;
27 import org.apache.kafka.clients.producer.ProducerConfig;
28 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
29 import org.springframework.boot.ssl.SslBundles;
30 import org.springframework.context.annotation.Bean;
31 import org.springframework.context.annotation.Configuration;
32 import org.springframework.context.annotation.Primary;
33 import org.springframework.kafka.annotation.EnableKafka;
34 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
35 import org.springframework.kafka.core.ConsumerFactory;
36 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
37 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
38 import org.springframework.kafka.core.KafkaTemplate;
39 import org.springframework.kafka.core.ProducerFactory;
40 import org.springframework.kafka.support.serializer.JsonDeserializer;
41 import org.springframework.kafka.support.serializer.JsonSerializer;
42
43 /**
44  * kafka Configuration for legacy and cloud events.
45  *
46  * @param <T> valid legacy event to be published over the wire.
47  */
48 @Configuration
49 @EnableKafka
50 @RequiredArgsConstructor
51 public class KafkaConfig<T> {
52
53     private final KafkaProperties kafkaProperties;
54
55     private static final SslBundles NO_SSL = null;
56
57     /**
58      * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
59      * application.yml and replaces value-serializer by JsonSerializer.
60      *
61      * @return legacy event producer instance.
62      */
63     @Bean
64     public ProducerFactory<String, T> legacyEventProducerFactory() {
65         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
66         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
67         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
68     }
69
70     /**
71      * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
72      * into application.yml and replaces deserializer-value by JsonDeserializer.
73      *
74      * @return an instance of legacy consumer factory.
75      */
76     @Bean
77     public ConsumerFactory<String, T> legacyEventConsumerFactory() {
78         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
79         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
80         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
81     }
82
83     /**
84      * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
85      *
86      * @return an instance of legacy Kafka template.
87      */
88     @Bean
89     @Primary
90     public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
91         final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
92         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
93         return kafkaTemplate;
94     }
95
96     /**
97      * A legacy concurrent kafka listener container factory.
98      *
99      * @return instance of Concurrent kafka listener factory
100      */
101     @Bean
102     public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
103         final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
104                 new ConcurrentKafkaListenerContainerFactory<>();
105         containerFactory.setConsumerFactory(legacyEventConsumerFactory());
106         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
107         return containerFactory;
108     }
109
110     /**
111      * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
112      * application.yml with CloudEventSerializer.
113      *
114      * @return cloud event producer instance.
115      */
116     @Bean
117     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
118         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
119         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
120     }
121
122     /**
123      * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
124      * into application.yml having CloudEventDeserializer as deserializer-value.
125      *
126      * @return an instance of cloud consumer factory.
127      */
128     @Bean
129     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
130         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
131         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
132     }
133
134
135     /**
136      * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
137      *
138      * @return an instance of cloud Kafka template.
139      */
140     @Bean
141     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
142         final KafkaTemplate<String, CloudEvent> kafkaTemplate =
143             new KafkaTemplate<>(cloudEventProducerFactory());
144         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
145         return kafkaTemplate;
146     }
147
148     /**
149      * A Concurrent CloudEvent kafka listener container factory.
150      *
151      * @return instance of Concurrent kafka listener factory
152      */
153     @Bean
154     public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
155                                         cloudEventConcurrentKafkaListenerContainerFactory() {
156         final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
157                 new ConcurrentKafkaListenerContainerFactory<>();
158         containerFactory.setConsumerFactory(cloudEventConsumerFactory());
159         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
160         return containerFactory;
161     }
162
163 }