5b113503e73ba5229785c3a01ae02c031f3f8dd2
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonParser;
26 import com.google.gson.JsonPrimitive;
27 import java.io.File;
28 import java.net.URL;
29 import java.time.Duration;
30 import java.util.Arrays;
31 import java.util.List;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Disabled;
34 import org.junit.jupiter.api.Test;
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
46 import org.testcontainers.containers.DockerComposeContainer;
47 import org.testcontainers.junit.jupiter.Container;
48 import org.testcontainers.junit.jupiter.Testcontainers;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
52
53 @Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ")
54 @Testcontainers
55 class MessageRouterSubscriberCIT {
56     private static final Gson gson = new Gson();
57     private static final Duration TIMEOUT = Duration.ofSeconds(10);
58     private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
59     private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza");
60     private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems)
61             .map(JsonPrimitive::new);
62     private static final String CONSUMER_GROUP = "group1";
63     private static final String CONSUMER_ID = "consumer200";
64     private static final String DMAAP_SERVICE_NAME = "dmaap";
65     private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
66     private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
67             MR_COMPOSE_RESOURCE_NAME);
68     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
69             "{" +
70             "\"mrstatus\":3001," +
71             "\"helpURL\":\"http://onap.readthedocs.io\"," +
72             "\"message\":\"No such topic exists.-[%s]\"," +
73             "\"status\":404" +
74             "}";
75
76     @Container
77     private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
78             new File(DOCKER_COMPOSE_FILE_PATH))
79             .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
80
81     private static String EVENTS_PATH;
82
83     private MessageRouterPublisher publisher = DmaapClientFactory
84             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
85     private MessageRouterSubscriber sut = DmaapClientFactory
86             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
87
88
89     @BeforeAll
90     static void setUp() {
91         EVENTS_PATH = String.format("http://%s:%d/events",
92                 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
93                 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
94     }
95
96     @Test
97     void subscriber_shouldHandleNoSuchTopicException() {
98         //given
99         final String topic = "newTopic";
100         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
101         final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
102         final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
103                 .builder()
104                 .failReason(expectedFailReason)
105                 .build();
106
107         //when
108         Mono<MessageRouterSubscribeResponse> response = sut
109                 .get(mrSubscribeRequest);
110
111         //then
112         StepVerifier.create(response)
113                 .expectNext(expectedResponse)
114                 .expectComplete()
115                 .verify(TIMEOUT);
116     }
117
118     @Test
119     void subscriber_shouldGetCorrectResponse() {
120         //given
121         final String topic = "TOPIC";
122         final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain");
123         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
124         final JsonArray expectedItems = getAsJsonArray(messageBatchItems);
125         final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
126                 .builder()
127                 .items(expectedItems)
128                 .build();
129
130         //when
131         registerTopic(publishRequest, subscribeRequest);
132         Mono<MessageRouterSubscribeResponse> response = publisher
133                 .put(publishRequest, messageBatch)
134                 .then(sut.get(subscribeRequest));
135
136         //then
137         StepVerifier.create(response)
138                 .expectNext(expectedResponse)
139                 .expectComplete()
140                 .verify();
141     }
142
143     private static String getDockerComposeFilePath(String resourceName){
144         URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
145                 .getResource(resourceName);
146
147         if(resource != null) return resource.getFile();
148         else throw new DockerComposeNotFoundException(String
149                 .format("File %s does not exist", resourceName));
150     }
151
152     private static MessageRouterPublishRequest createMRPublishRequest(String topic,
153             String contentType) {
154         MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
155                 .name("the topic")
156                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
157                 .build();
158
159         return ImmutableMessageRouterPublishRequest.builder()
160                 .sinkDefinition(sinkDefinition)
161                 .contentType(contentType)
162                 .build();
163     }
164
165     private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
166         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
167                 .name("the topic")
168                 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
169                 .build();
170
171         return ImmutableMessageRouterSubscribeRequest
172                 .builder()
173                 .sourceDefinition(sourceDefinition)
174                 .consumerGroup(CONSUMER_GROUP)
175                 .consumerId(CONSUMER_ID)
176                 .build();
177     }
178
179     private void registerTopic(MessageRouterPublishRequest publishRequest,
180             MessageRouterSubscribeRequest subscribeRequest) {
181         Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new);
182
183         publisher.put(publishRequest, sampleMessage).blockLast();
184         sut.get(subscribeRequest).block();
185     }
186
187     private JsonArray getAsJsonArray(List<String> list) {
188         String listsJsonString = gson.toJson(list);
189         return new JsonParser().parse(listsJsonString).getAsJsonArray();
190     }
191 }