Fix sonar issues
[dcaegen2/platform.git] / mod / bpgenerator / onap / src / main / java / org / onap / blueprintgenerator / service / common / kafka / KafkaStreamService.java
1 /*
2  *
3  *  * ============LICENSE_START=======================================================
4  *  *  org.onap.dcae
5  *  *  ================================================================================
6  *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
7  *  *  ================================================================================
8  *  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  *  you may not use this file except in compliance with the License.
10  *  *  You may obtain a copy of the License at
11  *  *
12  *  *       http://www.apache.org/licenses/LICENSE-2.0
13  *  *
14  *  *  Unless required by applicable law or agreed to in writing, software
15  *  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  *  See the License for the specific language governing permissions and
18  *  *  limitations under the License.
19  *  *  ============LICENSE_END=========================================================
20  *
21  *
22  */
23
24 package org.onap.blueprintgenerator.service.common.kafka;
25
26
27 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME;
28 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME;
29 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME;
30
31 import java.util.LinkedHashMap;
32 import java.util.Map;
33 import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Service;
36
37 /**
38  * @author : Tomasz Wrobel
39  * @date 18/01/2021 Application: ONAP - Blueprint Generator Common ONAP Service used to create Kafka Stream application
40  * config object and Kafka Stream inputs
41  */
42 @Service
43 public class KafkaStreamService {
44
45     private static final String PUBLISH_URL_SUFFIX = "_publish_url";
46     private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url";
47     private static final String DEFAULT_STREAM_URL = "sample_stream_url";
48     private static final String DEFAULT_BOOTSTRAP_SERVER = "message-router-kafka:9092";
49     private static final String DEFAULT_AAF_USER = "admin";
50     private static final String DEFAULT_AAF_PASSWORD = "admin_secret";
51
52     @Autowired
53     private BlueprintHelperService blueprintHelperService;
54
55
56     /**
57      * Creates publish stream inputs for given streamName
58      *
59      * @param streamName Stream name
60      * @return
61      */
62     public Map<String, Map<String, Object>> createStreamPublishInputs(String streamName) {
63         return createStreamInputs(streamName + PUBLISH_URL_SUFFIX);
64     }
65
66     /**
67      * Creates subscribe stream inputs for given streamName
68      *
69      * @param streamName Stream name
70      * @return
71      */
72     public Map<String, Map<String, Object>> createStreamSubscribeInputs(String streamName) {
73         return createStreamInputs(streamName + SUBSCRIBE_URL_SUFFIX);
74     }
75
76     /**
77      * Creates Application properties publish stream object for given streamName
78      *
79      * @param streamName Stream name
80      * @return
81      */
82     public Map<String, KafkaStream> createAppPropertiesPublish(String streamName) {
83
84         Map<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
85         KafkaStream kafkaStream = createAppProperties(streamName, PUBLISH_URL_SUFFIX);
86
87         kafkaStreamMap.put(streamName, kafkaStream);
88
89         return kafkaStreamMap;
90     }
91
92     /**
93      * Creates Application properties subscribe stream object for given streamName
94      *
95      * @param streamName Stream name
96      * @return
97      */
98     public Map<String, KafkaStream> createAppPropertiesSubscribe(String streamName) {
99
100         Map<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
101         KafkaStream kafkaStream = createAppProperties(streamName, SUBSCRIBE_URL_SUFFIX);
102
103         kafkaStreamMap.put(streamName, kafkaStream);
104
105         return kafkaStreamMap;
106     }
107
108     private KafkaStream createAppProperties(String streamName, String urlSuffix) {
109         String topicName = streamName + urlSuffix;
110
111         return new KafkaStream(topicName);
112     }
113
114     private Map<String, Map<String, Object>> createStreamInputs(String streamName) {
115         Map<String, Map<String, Object>> streamInputs = createBaseInputs();
116         Map<String, Object> stream =
117             blueprintHelperService.createStringInput(DEFAULT_STREAM_URL);
118         streamInputs.put(streamName, stream);
119         return streamInputs;
120     }
121
122     private Map<String, Map<String, Object>> createBaseInputs() {
123         Map<String, Map<String, Object>> baseInputs = new LinkedHashMap<>();
124
125         Map<String, Object> kafka_message_router = blueprintHelperService
126             .createStringInput(DEFAULT_BOOTSTRAP_SERVER);
127         baseInputs.put(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME, kafka_message_router);
128
129         Map<String, Object> kafka_username = blueprintHelperService.createStringInput(DEFAULT_AAF_USER);
130         baseInputs.put(AFF_KAFKA_USER_INPUT_NAME, kafka_username);
131
132         Map<String, Object> kafka_password = blueprintHelperService.createStringInput(DEFAULT_AAF_PASSWORD);
133         baseInputs.put(AAF_KAFKA_PASSWORD_INPUT_NAME, kafka_password);
134
135         return baseInputs;
136     }
137 }