7a31209ccc20e7cc87c56fd52dd05d5f4d0ec3dc
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.JsonElement;
24 import com.google.gson.JsonObject;
25 import io.vavr.collection.List;
26 import org.junit.jupiter.api.BeforeAll;
27 import org.junit.jupiter.api.Test;
28 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
29 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
30 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
33 import org.testcontainers.containers.DockerComposeContainer;
34 import org.testcontainers.junit.jupiter.Container;
35 import org.testcontainers.junit.jupiter.Testcontainers;
36 import reactor.core.publisher.Flux;
37 import reactor.core.publisher.Mono;
38 import reactor.test.StepVerifier;
39
40 import java.time.Duration;
41
42 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
43 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
44 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
45 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
46 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
47 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
48 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
49 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
50
51 @Testcontainers
52 class MessageRouterSubscriberIT {
53     private static final Duration TIMEOUT = Duration.ofSeconds(10);
54     private static final String CONSUMER_GROUP = "group1";
55     private static final String CONSUMER_ID = "consumer200";
56     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
57             "{" +
58             "\"mrstatus\":3001," +
59             "\"helpURL\":\"http://onap.readthedocs.io\"," +
60             "\"message\":\"No such topic exists.-[%s]\"," +
61             "\"status\":404" +
62             "}";
63
64     @Container
65     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
66
67     private static String EVENTS_PATH;
68
69     private MessageRouterPublisher publisher = DmaapClientFactory
70             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
71     private MessageRouterSubscriber subscriber = DmaapClientFactory
72             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
73
74
75     @BeforeAll
76     static void setUp() {
77         EVENTS_PATH = String.format("http://%s:%d/events",
78                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
79                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
80                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
81                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
82     }
83
84     @Test
85     void subscriber_shouldHandleNoSuchTopicException() {
86         //given
87         final String topic = "newTopic";
88         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
89                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
90         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
91                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
92
93         //when
94         Mono<MessageRouterSubscribeResponse> response = subscriber
95                 .get(mrSubscribeRequest);
96
97         //then
98         StepVerifier.create(response)
99                 .expectNext(expectedResponse)
100                 .expectComplete()
101                 .verify(TIMEOUT);
102     }
103
104     @Test
105     void subscriberShouldHandleSingleItemResponse(){
106         //given
107         final String topic = "TOPIC";
108         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
109         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
110         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
111
112         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
113         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
114         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
115         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
116
117         //when
118         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
119         Mono<MessageRouterSubscribeResponse> response = publisher
120                 .put(publishRequest, jsonMessageBatch)
121                 .then(subscriber.get(subscribeRequest));
122
123         //then
124         StepVerifier.create(response)
125                 .expectNext(expectedResponse)
126                 .expectComplete()
127                 .verify();
128     }
129
130     @Test
131     void subscriber_shouldHandleMultipleItemsResponse() {
132         //given
133         final String topic = "TOPIC2";
134         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
135         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
136         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
137
138         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
139                 "{\"differentMessage\":\"message2\"}");
140         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
141         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
142         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
143
144         //when
145         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
146         Mono<MessageRouterSubscribeResponse> response = publisher
147                 .put(publishRequest, jsonMessageBatch)
148                 .then(subscriber.get(subscribeRequest));
149
150         //then
151         StepVerifier.create(response)
152                 .expectNext(expectedResponse)
153                 .expectComplete()
154                 .verify();
155     }
156
157     @Test
158     void subscriber_shouldExtractItemsFromResponse() {
159         //given
160         final String topic = "TOPIC3";
161         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
162         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
163         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
164                 CONSUMER_GROUP, CONSUMER_ID);
165
166         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
167                 "{\"differentMessage\":\"message2\"}");
168         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
169
170         //when
171         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
172         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
173                 .thenMany(subscriber.getElements(subscribeRequest));
174
175         //then
176         StepVerifier.create(result)
177                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
178                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
179                 .expectComplete()
180                 .verify(TIMEOUT);
181     }
182
183     @Test
184     void subscriber_shouldSubscribeToTopic(){
185         //given
186         final String topic = "TOPIC4";
187         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
188         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
189         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
190                 CONSUMER_GROUP, CONSUMER_ID);
191
192         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
193                 "{\"differentMessage\":\"message2\"}");
194         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
195         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
196
197         //when
198         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
199         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
200                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
201
202         //then
203         StepVerifier.create(result.take(2))
204                 .expectNext(messages.get(0))
205                 .expectNext(messages.get(1))
206                 .expectComplete()
207                 .verify(TIMEOUT);
208     }
209
210
211
212 }