Revert "[SDC-BE] Add kafka ssl config"
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / kafka / KafkaHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import com.google.gson.Gson;
23 import com.google.gson.JsonSyntaxException;
24 import fj.data.Either;
25 import lombok.Getter;
26 import lombok.Setter;
27 import org.apache.http.HttpStatus;
28 import org.apache.kafka.common.KafkaException;
29 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
30 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
31 import org.openecomp.sdc.be.config.BeEcompErrorManager;
32 import org.openecomp.sdc.be.config.ConfigurationManager;
33 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
34 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
35 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import org.springframework.stereotype.Component;
37
38 /**
39  * Utility class that provides a handler for Kafka interactions
40  */
41 @Component
42 public class KafkaHandler {
43
44     private static final Logger log = Logger.getLogger(KafkaHandler.class.getName());
45     private final Gson gson = new Gson();
46
47     private SdcKafkaConsumer sdcKafkaConsumer;
48
49     private SdcKafkaProducer sdcKafkaProducer;
50
51     @Setter
52     private boolean isKafkaActive;
53
54     private DistributionEngineConfiguration deConfiguration;
55
56     public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) {
57         this.sdcKafkaConsumer = sdcKafkaConsumer;
58         this.sdcKafkaProducer = sdcKafkaProducer;
59         this.isKafkaActive = isKafkaActive;
60     }
61
62     public KafkaHandler() {
63         isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false"));
64         deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
65     }
66
67     /**
68      * @return a user configuration whether Kafka is active for this client
69      */
70     public Boolean isKafkaActive() {
71         return isKafkaActive;
72     }
73
74     /**
75      * @param topicName The topic from which messages will be fetched
76      * @return Either A list of messages from a specific topic, or a specific error response
77      */
78     public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) {
79         try {
80             if(sdcKafkaConsumer == null){
81                 sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration);
82             }
83             sdcKafkaConsumer.subscribe(topicName);
84             Iterable<String> messages = sdcKafkaConsumer.poll(topicName);
85             log.info("Returning messages from topic {}", topicName);
86             return Either.left(messages);
87         } catch (KafkaException e) {
88             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage());
89             log.error("Failed to fetch from kafka for topic: {}", topicName, e);
90             CambriaErrorResponse cambriaErrorResponse =
91                 new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR,
92                     HttpStatus.SC_INTERNAL_SERVER_ERROR);
93             return Either.right(cambriaErrorResponse);
94         }
95     }
96
97     /**
98      * Publish notification message to a given topic and flush
99      *
100      * @param topicName The topic to which the message should be published
101      * @param data      The data to publish to the topic specified
102      * @return CambriaErrorResponse a status response on success or any errors thrown
103      */
104     public CambriaErrorResponse sendNotification(String topicName, INotificationData data) {
105         CambriaErrorResponse response;
106         if(sdcKafkaProducer == null){
107             sdcKafkaProducer = new SdcKafkaProducer(deConfiguration);
108         }
109        try {
110            String json = gson.toJson(data);
111            log.info("Before sending to topic {}", topicName);
112            sdcKafkaProducer.send(json, topicName);
113        }
114        catch(KafkaException e){
115            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
116            log.error("Failed to send message . Exception {}", e.getMessage());
117
118            return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
119        } catch (JsonSyntaxException e) {
120            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
121            log.error("Failed to convert data to json: {}", data, e);
122
123             return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
124         } finally {
125             try {
126                 sdcKafkaProducer.flush();
127                 response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
128             } catch (KafkaException | IllegalArgumentException e) {
129                 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
130                 log.error("Failed to flush sdcKafkaProducer", e);
131
132                 response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
133             }
134         }
135
136         return response;
137     }
138 }