2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2021 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import com.google.gson.JsonElement;
23 import com.google.gson.JsonObject;
24 import com.google.gson.JsonParser;
25 import io.vavr.collection.List;
26 import io.vavr.control.Option;
27 import org.jetbrains.annotations.NotNull;
28 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
37 import reactor.core.publisher.Flux;
39 import java.time.Duration;
41 public class DmaapRequestConfiguration {
43 private static final Long TIMEOUT_SECONDS = 10L;
44 private static final int RETRY_INTERVAL_IN_SECONDS = 1;
45 private static final int RETRY_COUNT = 1;
47 private DmaapRequestConfiguration() {
50 static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
51 String topicUrl = createUrl(publisherConfig);
52 return ImmutableMessageRouterPublishRequest.builder()
53 .sinkDefinition(createMessageRouterSink(topicUrl))
54 .contentType(ContentType.APPLICATION_JSON)
55 .timeoutConfig(timeOutConfiguration(timeout))
59 static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
60 return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
63 static Flux<JsonObject> jsonBatch(List<String> messages) {
64 return Flux.fromIterable(getAsJsonObjects(messages));
67 static MessageRouterPublisherConfig retryConfiguration() {
68 return ImmutableMessageRouterPublisherConfig.builder()
69 .retryConfig(ImmutableDmaapRetryConfig.builder()
70 .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
71 .retryCount(RETRY_COUNT)
76 private static String createUrl(Option<PublisherConfig> publisherConfig) {
77 String hostAndPort = publisherConfig.get().getHostAndPort();
78 String topicName = publisherConfig.get().topic();
79 return String.format("http://%s/events/%s/",hostAndPort,topicName);
82 private static List<JsonObject> getAsJsonObjects(List<String> messages) {
83 return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
86 static List<JsonElement> getAsJsonElements(List<String> messages) {
87 return messages.map(JsonParser::parseString);
90 static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
91 return ImmutableMessageRouterSink.builder()
98 private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
99 return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();