2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonParser;
26 import com.google.gson.JsonPrimitive;
29 import java.time.Duration;
30 import java.util.Arrays;
31 import java.util.List;
32 import org.junit.jupiter.api.BeforeAll;
33 import org.junit.jupiter.api.Disabled;
34 import org.junit.jupiter.api.Test;
35 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
36 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
37 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
46 import org.testcontainers.containers.DockerComposeContainer;
47 import org.testcontainers.junit.jupiter.Container;
48 import org.testcontainers.junit.jupiter.Testcontainers;
49 import reactor.core.publisher.Flux;
50 import reactor.core.publisher.Mono;
51 import reactor.test.StepVerifier;
53 @Disabled("Disabled until fix messages formatting in MessageRouterPublisher::put ")
55 class MessageRouterSubscriberCIT {
56 private static final Gson gson = new Gson();
57 private static final Duration TIMEOUT = Duration.ofSeconds(10);
58 private static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
59 private static final List<String> messageBatchItems = Arrays.asList("I", "like", "pizza");
60 private static final Flux<JsonPrimitive> messageBatch = Flux.fromIterable(messageBatchItems)
61 .map(JsonPrimitive::new);
62 private static final String CONSUMER_GROUP = "group1";
63 private static final String CONSUMER_ID = "consumer200";
64 private static final String DMAAP_SERVICE_NAME = "dmaap";
65 private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
66 private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
67 MR_COMPOSE_RESOURCE_NAME);
68 private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
70 "\"mrstatus\":3001," +
71 "\"helpURL\":\"http://onap.readthedocs.io\"," +
72 "\"message\":\"No such topic exists.-[%s]\"," +
77 private static final DockerComposeContainer CONTAINER = new DockerComposeContainer(
78 new File(DOCKER_COMPOSE_FILE_PATH))
79 .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT);
81 private static String EVENTS_PATH;
83 private MessageRouterPublisher publisher = DmaapClientFactory
84 .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
85 private MessageRouterSubscriber sut = DmaapClientFactory
86 .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
91 EVENTS_PATH = String.format("http://%s:%d/events",
92 CONTAINER.getServiceHost(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT),
93 CONTAINER.getServicePort(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
97 void subscriber_shouldHandleNoSuchTopicException() {
99 final String topic = "newTopic";
100 final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(topic);
101 final String expectedFailReason = String.format(DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
102 final MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
104 .failReason(expectedFailReason)
108 Mono<MessageRouterSubscribeResponse> response = sut
109 .get(mrSubscribeRequest);
112 StepVerifier.create(response)
113 .expectNext(expectedResponse)
119 void subscriber_shouldGetCorrectResponse() {
121 final String topic = "TOPIC";
122 final MessageRouterPublishRequest publishRequest = createMRPublishRequest(topic, "text/plain");
123 final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topic);
124 final JsonArray expectedItems = getAsJsonArray(messageBatchItems);
125 final ImmutableMessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
127 .items(expectedItems)
131 registerTopic(publishRequest, subscribeRequest);
132 Mono<MessageRouterSubscribeResponse> response = publisher
133 .put(publishRequest, messageBatch)
134 .then(sut.get(subscribeRequest));
137 StepVerifier.create(response)
138 .expectNext(expectedResponse)
143 private static String getDockerComposeFilePath(String resourceName){
144 URL resource = MessageRouterSubscriberCIT.class.getClassLoader()
145 .getResource(resourceName);
147 if(resource != null) return resource.getFile();
148 else throw new DockerComposeNotFoundException(String
149 .format("File %s does not exist", resourceName));
152 private static MessageRouterPublishRequest createMRPublishRequest(String topic,
153 String contentType) {
154 MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
156 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
159 return ImmutableMessageRouterPublishRequest.builder()
160 .sinkDefinition(sinkDefinition)
161 .contentType(contentType)
165 private MessageRouterSubscribeRequest createMRSubscribeRequest(String topic) {
166 ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
168 .topicUrl(String.format("%s/%s", EVENTS_PATH, topic))
171 return ImmutableMessageRouterSubscribeRequest
173 .sourceDefinition(sourceDefinition)
174 .consumerGroup(CONSUMER_GROUP)
175 .consumerId(CONSUMER_ID)
179 private void registerTopic(MessageRouterPublishRequest publishRequest,
180 MessageRouterSubscribeRequest subscribeRequest) {
181 Flux<JsonPrimitive> sampleMessage = Flux.just("sample message").map(JsonPrimitive::new);
183 publisher.put(publishRequest, sampleMessage).blockLast();
184 sut.get(subscribeRequest).block();
187 private JsonArray getAsJsonArray(List<String> list) {
188 String listsJsonString = gson.toJson(list);
189 return new JsonParser().parse(listsJsonString).getAsJsonArray();