3 * * ============LICENSE_START=======================================================
5 * * ================================================================================
6 * * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
7 * * ================================================================================
8 * * Licensed under the Apache License, Version 2.0 (the "License");
9 * * you may not use this file except in compliance with the License.
10 * * You may obtain a copy of the License at
12 * * http://www.apache.org/licenses/LICENSE-2.0
14 * * Unless required by applicable law or agreed to in writing, software
15 * * distributed under the License is distributed on an "AS IS" BASIS,
16 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * * See the License for the specific language governing permissions and
18 * * limitations under the License.
19 * * ============LICENSE_END=========================================================
24 package org.onap.blueprintgenerator.service.common.kafka;
27 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME;
28 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME;
29 import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME;
31 import java.util.LinkedHashMap;
33 import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Service;
38 * @author : Tomasz Wrobel
39 * @date 18/01/2021 Application: ONAP - Blueprint Generator Common ONAP Service used to create Kafka Stream application
40 * config object and Kafka Stream inputs
43 public class KafkaStreamService {
45 private static final String PUBLISH_URL_SUFFIX = "_publish_url";
46 private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url";
47 private static final String DEFAULT_STREAM_URL = "sample_stream_url";
48 private static final String DEFAULT_BOOTSTRAP_SERVER = "message-router-kafka:9092";
49 private static final String DEFAULT_AAF_USER = "admin";
50 private static final String DEFAULT_AAF_PASSWORD = "admin_secret";
53 private BlueprintHelperService blueprintHelperService;
57 * Creates publish stream inputs for given streamName
59 * @param streamName Stream name
62 public Map<String, LinkedHashMap<String, Object>> createStreamPublishInputs(String streamName) {
63 return createStreamInputs(streamName + PUBLISH_URL_SUFFIX);
67 * Creates subscribe stream inputs for given streamName
69 * @param streamName Stream name
72 public Map<String, LinkedHashMap<String, Object>> createStreamSubscribeInputs(String streamName) {
73 return createStreamInputs(streamName + SUBSCRIBE_URL_SUFFIX);
77 * Creates Application properties publish stream object for given streamName
79 * @param streamName Stream name
82 public Map<String, KafkaStream> createAppPropertiesPublish(String streamName) {
84 LinkedHashMap<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
85 KafkaStream kafkaStream = createAppProperties(streamName, PUBLISH_URL_SUFFIX);
87 kafkaStreamMap.put(streamName, kafkaStream);
89 return kafkaStreamMap;
93 * Creates Application properties subscribe stream object for given streamName
95 * @param streamName Stream name
98 public Map<String, KafkaStream> createAppPropertiesSubscribe(String streamName) {
100 LinkedHashMap<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
101 KafkaStream kafkaStream = createAppProperties(streamName, SUBSCRIBE_URL_SUFFIX);
103 kafkaStreamMap.put(streamName, kafkaStream);
105 return kafkaStreamMap;
108 private KafkaStream createAppProperties(String streamName, String urlSuffix) {
109 String topicName = streamName + urlSuffix;
111 return new KafkaStream(topicName);
114 private Map<String, LinkedHashMap<String, Object>> createStreamInputs(String streamName) {
115 LinkedHashMap<String, LinkedHashMap<String, Object>> streamInputs = createBaseInputs();
116 LinkedHashMap<String, Object> stream =
117 blueprintHelperService.createStringInput(DEFAULT_STREAM_URL);
118 streamInputs.put(streamName, stream);
122 private LinkedHashMap<String, LinkedHashMap<String, Object>> createBaseInputs() {
123 LinkedHashMap<String, LinkedHashMap<String, Object>> baseInputs = new LinkedHashMap<>();
125 LinkedHashMap<String, Object> kafka_message_router = blueprintHelperService
126 .createStringInput(DEFAULT_BOOTSTRAP_SERVER);
127 baseInputs.put(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME, kafka_message_router);
129 LinkedHashMap<String, Object> kafka_username = blueprintHelperService.createStringInput(DEFAULT_AAF_USER);
130 baseInputs.put(AFF_KAFKA_USER_INPUT_NAME, kafka_username);
132 LinkedHashMap<String, Object> kafka_password = blueprintHelperService.createStringInput(DEFAULT_AAF_PASSWORD);
133 baseInputs.put(AAF_KAFKA_PASSWORD_INPUT_NAME, kafka_password);