a758b2de30aae5d97aa85c142953049c19623200
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
24
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonObject;
27 import io.vavr.collection.List;
28 import java.time.Duration;
29 import org.junit.jupiter.api.BeforeAll;
30 import org.junit.jupiter.api.Test;
31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
36 import org.testcontainers.containers.DockerComposeContainer;
37 import org.testcontainers.junit.jupiter.Container;
38 import org.testcontainers.junit.jupiter.Testcontainers;
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.Mono;
41 import reactor.test.StepVerifier;
42
43 @Testcontainers
44 class MessageRouterSubscriberIT {
45     private static final Duration TIMEOUT = Duration.ofSeconds(10);
46     private static final String CONSUMER_GROUP = "group1";
47     private static final String CONSUMER_ID = "consumer200";
48     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
49             "{" +
50             "\"mrstatus\":3001," +
51             "\"helpURL\":\"http://onap.readthedocs.io\"," +
52             "\"message\":\"No such topic exists.-[%s]\"," +
53             "\"status\":404" +
54             "}";
55
56     @Container
57     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
58
59     private static String EVENTS_PATH;
60
61     private MessageRouterPublisher publisher = DmaapClientFactory
62             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
63     private MessageRouterSubscriber subscriber = DmaapClientFactory
64             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
65
66
67     @BeforeAll
68     static void setUp() {
69         EVENTS_PATH = String.format("http://%s:%d/events",
70                 CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
71                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
72                 CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
73                         DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
74     }
75
76     @Test
77     void subscriber_shouldHandleNoSuchTopicException() {
78         //given
79         final String topic = "newTopic";
80         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
81                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
82         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
83                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
84
85         //when
86         Mono<MessageRouterSubscribeResponse> response = subscriber
87                 .get(mrSubscribeRequest);
88
89         //then
90         StepVerifier.create(response)
91                 .expectNext(expectedResponse)
92                 .expectComplete()
93                 .verify(TIMEOUT);
94     }
95
96     @Test
97     void subscriberShouldHandleSingleItemResponse(){
98         //given
99         final String topic = "TOPIC";
100         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
101         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
102         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
103
104         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
105         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
106         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
107         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
108
109         //when
110         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
111         Mono<MessageRouterSubscribeResponse> response = publisher
112                 .put(publishRequest, jsonMessageBatch)
113                 .then(subscriber.get(subscribeRequest));
114
115         //then
116         StepVerifier.create(response)
117                 .expectNext(expectedResponse)
118                 .expectComplete()
119                 .verify();
120     }
121
122     @Test
123     void subscriber_shouldHandleMultipleItemsResponse() {
124         //given
125         final String topic = "TOPIC2";
126         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
127         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
128         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
129
130         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
131                 "{\"differentMessage\":\"message2\"}");
132         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
133         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
134         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
135
136         //when
137         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
138         Mono<MessageRouterSubscribeResponse> response = publisher
139                 .put(publishRequest, jsonMessageBatch)
140                 .then(subscriber.get(subscribeRequest));
141
142         //then
143         StepVerifier.create(response)
144                 .expectNext(expectedResponse)
145                 .expectComplete()
146                 .verify();
147     }
148
149     @Test
150     void subscriber_shouldExtractItemsFromResponse() {
151         //given
152         final String topic = "TOPIC3";
153         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
154         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
155         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
156                 CONSUMER_GROUP, CONSUMER_ID);
157
158         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
159                 "{\"differentMessage\":\"message2\"}");
160         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
161
162         //when
163         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
164         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
165                 .thenMany(subscriber.getElements(subscribeRequest));
166
167         //then
168         StepVerifier.create(result)
169                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
170                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
171                 .expectComplete()
172                 .verify(TIMEOUT);
173     }
174
175     @Test
176     void subscriber_shouldSubscribeToTopic(){
177         //given
178         final String topic = "TOPIC4";
179         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
180         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
181         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
182                 CONSUMER_GROUP, CONSUMER_ID);
183
184         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
185                 "{\"differentMessage\":\"message2\"}");
186         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
187         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
188
189         //when
190         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
191         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
192                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
193
194         //then
195         StepVerifier.create(result.take(2))
196                 .expectNext(messages.get(0))
197                 .expectNext(messages.get(1))
198                 .expectComplete()
199                 .verify(TIMEOUT);
200     }
201
202
203
204 }