[SDC-BE] Add kafka ssl config
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / kafka / KafkaHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import com.google.gson.Gson;
23 import com.google.gson.JsonSyntaxException;
24 import fj.data.Either;
25 import lombok.Setter;
26 import org.apache.http.HttpStatus;
27 import org.apache.kafka.common.KafkaException;
28 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
29 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
30 import org.openecomp.sdc.be.config.BeEcompErrorManager;
31 import org.openecomp.sdc.be.config.ConfigurationManager;
32 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
34 import org.openecomp.sdc.common.log.wrappers.Logger;
35 import org.springframework.stereotype.Component;
36
37 /**
38  * Utility class that provides a handler for Kafka interactions
39  */
40 @Component
41 public class KafkaHandler {
42
43     private static final Logger log = Logger.getLogger(KafkaHandler.class.getName());
44     private final Gson gson = new Gson();
45
46     private SdcKafkaConsumer sdcKafkaConsumer;
47
48     private SdcKafkaProducer sdcKafkaProducer;
49
50     @Setter
51     private boolean isKafkaActive;
52
53     private DistributionEngineConfiguration deConfiguration;
54
55     public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) {
56         this.sdcKafkaConsumer = sdcKafkaConsumer;
57         this.sdcKafkaProducer = sdcKafkaProducer;
58         this.isKafkaActive = isKafkaActive;
59     }
60
61     public KafkaHandler() {
62         isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false"));
63         deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
64     }
65
66     /**
67      * @return a user configuration whether Kafka is active for this client
68      */
69     public Boolean isKafkaActive() {
70         return isKafkaActive;
71     }
72
73     /**
74      * @param topicName The topic from which messages will be fetched
75      * @return Either A list of messages from a specific topic, or a specific error response
76      */
77     public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) {
78         try {
79             if(sdcKafkaConsumer == null){
80                 sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration);
81             }
82             sdcKafkaConsumer.subscribe(topicName);
83             Iterable<String> messages = sdcKafkaConsumer.poll(topicName);
84             log.info("Returning messages from topic {}", topicName);
85             return Either.left(messages);
86         } catch (KafkaException e) {
87             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage());
88             log.error("Failed to fetch from kafka for topic: {}", topicName, e);
89             CambriaErrorResponse cambriaErrorResponse =
90                 new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR,
91                     HttpStatus.SC_INTERNAL_SERVER_ERROR);
92             return Either.right(cambriaErrorResponse);
93         }
94     }
95
96     /**
97      * Publish notification message to a given topic and flush
98      *
99      * @param topicName The topic to which the message should be published
100      * @param data      The data to publish to the topic specified
101      * @return CambriaErrorResponse a status response on success or any errors thrown
102      */
103     public CambriaErrorResponse sendNotification(String topicName, INotificationData data) {
104         CambriaErrorResponse response;
105         if(sdcKafkaProducer == null){
106             sdcKafkaProducer = new SdcKafkaProducer(deConfiguration);
107         }
108        try {
109            String json = gson.toJson(data);
110            log.info("Before sending to topic {}", topicName);
111            sdcKafkaProducer.send(json, topicName);
112        }
113        catch(KafkaException e){
114            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
115            log.error("Failed to send message . Exception {}", e.getMessage());
116
117            return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
118        } catch (JsonSyntaxException e) {
119            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
120            log.error("Failed to convert data to json: {}", data, e);
121
122             return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
123         } finally {
124             try {
125                 sdcKafkaProducer.flush();
126                 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
127             } catch (KafkaException | IllegalArgumentException e) {
128                 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
129                 log.error("Failed to flush sdcKafkaProducer", e);
130
131                 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
132             }
133         }
134
135         return response;
136     }
137 }