3 * * ============LICENSE_START=======================================================
5 * * ================================================================================
6 * * Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
7 * * ================================================================================
8 * * Modifications Copyright (c) 2021 Nokia
9 * * ================================================================================
10 * * Licensed under the Apache License, Version 2.0 (the "License");
11 * * you may not use this file except in compliance with the License.
12 * * You may obtain a copy of the License at
14 * * http://www.apache.org/licenses/LICENSE-2.0
16 * * Unless required by applicable law or agreed to in writing, software
17 * * distributed under the License is distributed on an "AS IS" BASIS,
18 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * * See the License for the specific language governing permissions and
20 * * limitations under the License.
21 * * ============LICENSE_END=========================================================
26 package org.onap.blueprintgenerator.service.common;
28 import java.util.LinkedHashMap;
30 import java.util.TreeMap;
31 import org.onap.blueprintgenerator.constants.Constants;
32 import org.onap.blueprintgenerator.model.common.BaseStream;
33 import org.onap.blueprintgenerator.model.common.Dmaap;
34 import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec;
35 import org.onap.blueprintgenerator.model.componentspec.common.Publishes;
36 import org.onap.blueprintgenerator.model.componentspec.common.Subscribes;
37 import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
38 import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.stereotype.Service;
43 * @author : Joanna Jeremicz
44 * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes
47 @Service("streamService")
48 public class StreamService {
51 private KafkaStreamService kafkaStreamsService;
54 * Creates publishes stream for given Inputs and ComponentSpec
56 * @param onapComponentSpec Onap Component Specification
57 * @param blueprintHelperService Blueprint Helper Service
58 * @param dmaapService Dmaap Service
59 * @param inputs Inputs
60 * @param isDmaap Dmaap Argument
63 public Map<String, BaseStream> createStreamPublishes(
64 OnapComponentSpec onapComponentSpec,
65 BlueprintHelperService blueprintHelperService,
66 DmaapService dmaapService,
67 Map<String, LinkedHashMap<String, Object>> inputs,
70 Map<String, BaseStream> streamPublishes = new TreeMap<>();
71 if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) {
72 return streamPublishes;
75 for (Publishes publishes : onapComponentSpec.getStreams().getPublishes()) {
76 if (blueprintHelperService.isDataRouterType(publishes.getType())) {
77 String config = publishes.getConfig_key();
78 String name = config + Constants._FEED;
79 Map<String, Object> dmaapDataRouterResponse =
80 dmaapService.createDmaapDataRouter(inputs, config, name, isDmaap);
82 (Map<String, LinkedHashMap<String, Object>>) dmaapDataRouterResponse
84 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
85 dmaap.setType(publishes.getType());
86 streamPublishes.put(config, dmaap);
87 } else if (blueprintHelperService.isMessageRouterType(publishes.getType())) {
88 String config = publishes.getConfig_key();
89 String name = config + Constants._TOPIC;
90 Map<String, Object> dmaapDataRouterResponse =
92 .createDmaapMessageRouter(inputs, config, 'p', name, name, isDmaap);
94 (Map<String, LinkedHashMap<String, Object>>) dmaapDataRouterResponse
96 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
97 dmaap.setType(publishes.getType());
98 streamPublishes.put(config, dmaap);
99 } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) {
100 inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key()));
101 streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key()));
104 return streamPublishes;
108 * Creates subscribes stream for given Inputs and ComponentSpec
110 * @param onapComponentSpec Onap Component Specification
111 * @param blueprintHelperService Blueprint Helper Service
112 * @param dmaapService Dmaap Service
113 * @param inputs Inputs
114 * @param isDmaap Dmaap Argument
117 public Map<String, BaseStream> createStreamSubscribes(
118 OnapComponentSpec onapComponentSpec,
119 BlueprintHelperService blueprintHelperService,
120 DmaapService dmaapService,
121 Map<String, LinkedHashMap<String, Object>> inputs,
124 Map<String, BaseStream> streamSubscribes = new TreeMap<>();
125 if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) {
126 return streamSubscribes;
129 for (Subscribes subscribes : onapComponentSpec.getStreams().getSubscribes()) {
130 if (blueprintHelperService.isDataRouterType(subscribes.getType())) {
131 String config = subscribes.getConfig_key();
132 String name = config + Constants._FEED;
133 Map<String, Object> dmaapDataRouterResponse =
134 dmaapService.createDmaapDataRouter(inputs, config, name, isDmaap);
136 (Map<String, LinkedHashMap<String, Object>>) dmaapDataRouterResponse
138 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
139 dmaap.setType(subscribes.getType());
140 streamSubscribes.put(config, dmaap);
141 } else if (blueprintHelperService.isMessageRouterType(subscribes.getType())) {
142 String config = subscribes.getConfig_key();
143 String name = config + Constants._TOPIC;
144 Map<String, Object> dmaapDataRouterResponse =
146 .createDmaapMessageRouter(inputs, config, 's', name, name, isDmaap);
148 (Map<String, LinkedHashMap<String, Object>>) dmaapDataRouterResponse
150 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
151 dmaap.setType(subscribes.getType());
152 streamSubscribes.put(config, dmaap);
153 } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) {
154 inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key()));
155 streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key()));
160 return streamSubscribes;