2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.vavr.collection.List;
29 import java.time.Duration;
30 import org.junit.jupiter.api.BeforeAll;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
38 import org.testcontainers.containers.DockerComposeContainer;
39 import org.testcontainers.junit.jupiter.Container;
40 import org.testcontainers.junit.jupiter.Testcontainers;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43 import reactor.test.StepVerifier;
46 class MessageRouterPublisherIT {
48 private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
49 private static final Duration TIMEOUT = Duration.ofSeconds(10);
50 private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
52 + "\"mrstatus\":5007,"
53 + "\"helpURL\":\"http://onap.readthedocs.io\","
54 + "\"message\":\"Error while publishing data to topic.:%s."
55 + "Successfully published number of messages :0."
56 + "Expected { to start an object.\",\"status\":400"
58 private static String EVENTS_PATH;
59 private final MessageRouterPublisher publisher = DmaapClientFactory
60 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
61 private MessageRouterSubscriber subscriber = DmaapClientFactory
62 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
66 EVENTS_PATH = String.format("http://%s:%d/events",
67 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
68 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
69 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
70 DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
74 void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
76 final String topic = "TOPIC";
77 final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
78 "{\"differentMessage\":\"message2\"}");
79 final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
80 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
81 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
84 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
87 StepVerifier.create(result)
88 .expectNext(expectedResponse)
94 void publisher_shouldHandleBadRequestError(){
96 final String topic = "TOPIC2";
97 final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
98 final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
99 final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
100 final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
101 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
104 final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
107 StepVerifier.create(result)
108 .expectNext(expectedResponse)
114 void publisher_shouldSuccessfullyPublishSingleMessage(){
115 final String topic = "TOPIC3";
116 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
117 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
118 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
119 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
120 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
121 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
122 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
125 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
126 Mono<MessageRouterSubscribeResponse> response = publisher
127 .put(publishRequest, jsonMessageBatch)
128 .then(subscriber.get(subscribeRequest));
131 StepVerifier.create(response)
132 .expectNext(expectedResponse)
138 void publisher_shouldSuccessfullyPublishMultipleMessages(){
139 final String topic = "TOPIC4";
140 final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
141 final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
142 "{\"differentMessage\":\"message2\"}");
143 final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
144 final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
145 final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
146 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
147 final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
150 registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
151 Mono<MessageRouterSubscribeResponse> response = publisher
152 .put(publishRequest, jsonMessageBatch)
153 .then(subscriber.get(subscribeRequest));
156 StepVerifier.create(response)
157 .expectNext(expectedResponse)