c746bfec141166f71c0117090d0493ab2508cbc8
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterTestsUtils.*;
24
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import com.google.gson.JsonPrimitive;
28 import io.vavr.collection.List;
29 import java.time.Duration;
30 import org.junit.jupiter.api.BeforeAll;
31 import org.junit.jupiter.api.Test;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
38 import org.testcontainers.containers.DockerComposeContainer;
39 import org.testcontainers.junit.jupiter.Container;
40 import org.testcontainers.junit.jupiter.Testcontainers;
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43 import reactor.test.StepVerifier;
44
45 @Testcontainers
46 class MessageRouterPublisherIT {
47     @Container
48     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
49     private static final Duration TIMEOUT = Duration.ofSeconds(10);
50     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
51             + "{"
52             + "\"mrstatus\":5007,"
53             + "\"helpURL\":\"http://onap.readthedocs.io\","
54             + "\"message\":\"Error while publishing data to topic.:%s."
55             + "Successfully published number of messages :0."
56             + "Expected { to start an object.\",\"status\":400"
57             + "}";
58     private static String EVENTS_PATH;
59     private final MessageRouterPublisher publisher = DmaapClientFactory
60             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
61     private MessageRouterSubscriber subscriber = DmaapClientFactory
62             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
63
64     @BeforeAll
65     static void setUp() {
66         EVENTS_PATH = String.format("http://%s:%d/events",
67                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
68                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
69                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
70                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
71     }
72
73     @Test
74     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
75         //given
76         final String topic = "TOPIC";
77         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
78                 "{\"differentMessage\":\"message2\"}");
79         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
80         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
81         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
82
83         //when
84         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
85
86         //then
87         StepVerifier.create(result)
88                 .expectNext(expectedResponse)
89                 .expectComplete()
90                 .verify(TIMEOUT);
91     }
92
93     @Test
94     void publisher_shouldHandleBadRequestError(){
95         //given
96         final String topic = "TOPIC2";
97         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
98         final Flux<JsonPrimitive> messageBatch = plainBatch(threePlainTextMessages);
99         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
100         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
101                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
102
103         //when
104         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
105
106         //then
107         StepVerifier.create(result)
108                 .expectNext(expectedResponse)
109                 .expectComplete()
110                 .verify(TIMEOUT);
111     }
112
113     @Test
114     void publisher_shouldSuccessfullyPublishSingleMessage(){
115         final String topic = "TOPIC3";
116         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
117         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
118         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
119         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
120         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
121         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
122         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
123
124         //when
125         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
126         Mono<MessageRouterSubscribeResponse> response = publisher
127                 .put(publishRequest, jsonMessageBatch)
128                 .then(subscriber.get(subscribeRequest));
129
130         //then
131         StepVerifier.create(response)
132                 .expectNext(expectedResponse)
133                 .expectComplete()
134                 .verify();
135     }
136
137     @Test
138     void publisher_shouldSuccessfullyPublishMultipleMessages(){
139         final String topic = "TOPIC4";
140         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
141         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
142                 "{\"differentMessage\":\"message2\"}");
143         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
144         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
145         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
146         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
147         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
148
149         //when
150         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
151         Mono<MessageRouterSubscribeResponse> response = publisher
152                 .put(publishRequest, jsonMessageBatch)
153                 .then(subscriber.get(subscribeRequest));
154
155         //then
156         StepVerifier.create(response)
157                 .expectNext(expectedResponse)
158                 .expectComplete()
159                 .verify();
160     }
161 }