X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=mod%2Fbpgenerator%2Fonap%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fblueprintgenerator%2Fservice%2Fcommon%2FStreamService.java;h=17699a8aa758c1df0886f4ba4fa41d733ca73796;hb=72c2d38329865afa6692454b4fb90ab6f8a70638;hp=bd4cf8729a9ecdb1c89998bee48e6d580d4183bc;hpb=87e8c9cb8b1b335e213ad1e4adaabc3e622ebfb1;p=dcaegen2%2Fplatform.git diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java index bd4cf87..17699a8 100644 --- a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java +++ b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java @@ -29,39 +29,45 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; import org.onap.blueprintgenerator.constants.Constants; +import org.onap.blueprintgenerator.model.common.BaseStream; import org.onap.blueprintgenerator.model.common.Dmaap; import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec; import org.onap.blueprintgenerator.model.componentspec.common.Publishes; import org.onap.blueprintgenerator.model.componentspec.common.Subscribes; import org.onap.blueprintgenerator.service.base.BlueprintHelperService; +import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author : Joanna Jeremicz - * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service - * to create publishes and subscribes streams + * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes + * streams */ @Service("streamService") public class StreamService { + @Autowired + private KafkaStreamService kafkaStreamsService; + /** * Creates publishes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamPublishes( + public Map createStreamPublishes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, - Map> inputs, + Map> inputs, boolean isDmaap) { - Map streamPublishes = new TreeMap<>(); + Map streamPublishes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) { return streamPublishes; } @@ -69,27 +75,30 @@ public class StreamService { for (Publishes publishes : onapComponentSpec.getStreams().getPublishes()) { if (blueprintHelperService.isDataRouterType(publishes.getType())) { String config = publishes.getConfig_key(); - String name = config + Constants._FEED; + String name = config + Constants.A_FEED; Map dmaapDataRouterResponse = dmaapService.createDmaapDataRouter(inputs, config, name, isDmaap); inputs = - (Map>) dmaapDataRouterResponse + (Map>) dmaapDataRouterResponse .get("inputs"); Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(publishes.getType()); streamPublishes.put(config, dmaap); } else if (blueprintHelperService.isMessageRouterType(publishes.getType())) { String config = publishes.getConfig_key(); - String name = config + Constants._TOPIC; + String name = config + Constants.A_TOPIC; Map dmaapDataRouterResponse = dmaapService .createDmaapMessageRouter(inputs, config, 'p', name, name, isDmaap); inputs = - (Map>) dmaapDataRouterResponse + (Map>) dmaapDataRouterResponse .get("inputs"); Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(publishes.getType()); streamPublishes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key())); + streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key())); } } return streamPublishes; @@ -98,21 +107,21 @@ public class StreamService { /** * Creates subscribes stream for given Inputs and ComponentSpec * - * @param onapComponentSpec Onap Component Specification + * @param onapComponentSpec Onap Component Specification * @param blueprintHelperService Blueprint Helper Service - * @param dmaapService Dmaap Service - * @param inputs Inputs - * @param isDmaap Dmaap Argument + * @param dmaapService Dmaap Service + * @param inputs Inputs + * @param isDmaap Dmaap Argument * @return */ - public Map createStreamSubscribes( + public Map createStreamSubscribes( OnapComponentSpec onapComponentSpec, BlueprintHelperService blueprintHelperService, DmaapService dmaapService, - Map> inputs, + Map> inputs, boolean isDmaap) { - Map streamSubscribes = new TreeMap<>(); + Map streamSubscribes = new TreeMap<>(); if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) { return streamSubscribes; } @@ -120,28 +129,33 @@ public class StreamService { for (Subscribes subscribes : onapComponentSpec.getStreams().getSubscribes()) { if (blueprintHelperService.isDataRouterType(subscribes.getType())) { String config = subscribes.getConfig_key(); - String name = config + Constants._FEED; + String name = config + Constants.A_FEED; Map dmaapDataRouterResponse = dmaapService.createDmaapDataRouter(inputs, config, name, isDmaap); inputs = - (Map>) dmaapDataRouterResponse + (Map>) dmaapDataRouterResponse .get("inputs"); Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(subscribes.getType()); streamSubscribes.put(config, dmaap); } else if (blueprintHelperService.isMessageRouterType(subscribes.getType())) { String config = subscribes.getConfig_key(); - String name = config + Constants._TOPIC; + String name = config + Constants.A_TOPIC; Map dmaapDataRouterResponse = dmaapService .createDmaapMessageRouter(inputs, config, 's', name, name, isDmaap); inputs = - (Map>) dmaapDataRouterResponse + (Map>) dmaapDataRouterResponse .get("inputs"); Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap"); dmaap.setType(subscribes.getType()); streamSubscribes.put(config, dmaap); + } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) { + inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key())); + streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key())); } + + } return streamSubscribes; }