Repackage Inventory Feature
[cps.git] / cps-ncmp-service / src / main / java / org / onap / cps / ncmp / api / impl / config / kafka / KafkaConfig.java
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2023-2024 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.config.kafka;
22
23 import io.cloudevents.CloudEvent;
24 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingConsumerInterceptor;
25 import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
26 import java.time.Duration;
27 import java.util.Map;
28 import lombok.RequiredArgsConstructor;
29 import org.apache.kafka.clients.consumer.ConsumerConfig;
30 import org.apache.kafka.clients.producer.ProducerConfig;
31 import org.springframework.beans.factory.annotation.Value;
32 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
33 import org.springframework.boot.ssl.SslBundles;
34 import org.springframework.context.annotation.Bean;
35 import org.springframework.context.annotation.Configuration;
36 import org.springframework.context.annotation.Primary;
37 import org.springframework.kafka.annotation.EnableKafka;
38 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39 import org.springframework.kafka.core.ConsumerFactory;
40 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42 import org.springframework.kafka.core.KafkaTemplate;
43 import org.springframework.kafka.core.ProducerFactory;
44 import org.springframework.kafka.support.serializer.JsonDeserializer;
45 import org.springframework.kafka.support.serializer.JsonSerializer;
46
47 /**
48  * kafka Configuration for legacy and cloud events.
49  *
50  * @param <T> valid legacy event to be published over the wire.
51  */
52 @Configuration
53 @EnableKafka
54 @RequiredArgsConstructor
55 public class KafkaConfig<T> {
56
57     private final KafkaProperties kafkaProperties;
58
59     @Value("${cps.tracing.enabled:false}")
60     private boolean tracingEnabled;
61
62     private static final SslBundles NO_SSL = null;
63
64     /**
65      * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
66      * application.yml and replaces value-serializer by JsonSerializer.
67      *
68      * @return legacy event producer instance.
69      */
70     @Bean
71     public ProducerFactory<String, T> legacyEventProducerFactory() {
72         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
73         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
74         if (tracingEnabled) {
75             producerConfigProperties.put(
76                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
77         }
78         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
79     }
80
81     /**
82      * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
83      * into application.yml and replaces deserializer-value by JsonDeserializer.
84      *
85      * @return an instance of legacy consumer factory.
86      */
87     @Bean
88     public ConsumerFactory<String, T> legacyEventConsumerFactory() {
89         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
90         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
91         if (tracingEnabled) {
92             consumerConfigProperties.put(
93                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
94         }
95         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
96     }
97
98     /**
99      * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
100      *
101      * @return an instance of legacy Kafka template.
102      */
103     @Bean
104     @Primary
105     public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
106         final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
107         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
108         if (tracingEnabled) {
109             kafkaTemplate.setObservationEnabled(true);
110         }
111         return kafkaTemplate;
112     }
113
114     /**
115      * A legacy concurrent kafka listener container factory.
116      *
117      * @return instance of Concurrent kafka listener factory
118      */
119     @Bean
120     public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
121         final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
122                 new ConcurrentKafkaListenerContainerFactory<>();
123         containerFactory.setConsumerFactory(legacyEventConsumerFactory());
124         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
125         if (tracingEnabled) {
126             containerFactory.getContainerProperties().setObservationEnabled(true);
127         }
128         return containerFactory;
129     }
130
131     /**
132      * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
133      * application.yml with CloudEventSerializer.
134      *
135      * @return cloud event producer instance.
136      */
137     @Bean
138     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
139         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
140         if (tracingEnabled) {
141             producerConfigProperties.put(
142                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
143         }
144         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
145     }
146
147     /**
148      * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
149      * into application.yml having CloudEventDeserializer as deserializer-value.
150      *
151      * @return an instance of cloud consumer factory.
152      */
153     @Bean
154     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
155         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
156         if (tracingEnabled) {
157             consumerConfigProperties.put(
158                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
159         }
160         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
161     }
162
163
164     /**
165      * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
166      *
167      * @return an instance of cloud Kafka template.
168      */
169     @Bean
170     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
171         final KafkaTemplate<String, CloudEvent> kafkaTemplate =
172             new KafkaTemplate<>(cloudEventProducerFactory());
173         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
174         if (tracingEnabled) {
175             kafkaTemplate.setObservationEnabled(true);
176         }
177         return kafkaTemplate;
178     }
179
180     /**
181      * A Concurrent CloudEvent kafka listener container factory.
182      *
183      * @return instance of Concurrent kafka listener factory
184      */
185     @Bean
186     public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
187                                         cloudEventConcurrentKafkaListenerContainerFactory() {
188         final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
189                 new ConcurrentKafkaListenerContainerFactory<>();
190         containerFactory.setConsumerFactory(cloudEventConsumerFactory());
191         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
192         if (tracingEnabled) {
193             containerFactory.getContainerProperties().setObservationEnabled(true);
194         }
195         return containerFactory;
196     }
197
198 }