f269b942ea07f55a8adc55a37f9eb6c778653111
[dcaegen2/collectors/ves.git] / src / test / java / org / onap / dcae / common / publishing / PublisherTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VES Collector
4  * ================================================================================
5  * Copyright (C) 2021 Nokia. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.dcae.common.publishing;
21
22 import com.google.gson.JsonElement;
23 import io.vavr.collection.List;
24 import io.vavr.control.Option;
25 import org.junit.jupiter.api.Test;
26 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
27 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
28 import org.testcontainers.containers.DockerComposeContainer;
29 import org.testcontainers.junit.jupiter.Container;
30 import org.testcontainers.junit.jupiter.Testcontainers;
31 import reactor.core.publisher.Flux;
32 import reactor.test.StepVerifier;
33
34 import java.time.Duration;
35
36 import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
37 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
38
39
40 @Testcontainers
41 public class PublisherTest {
42
43     @Container
44     private final DockerComposeContainer CONTAINER = createContainerInstance();
45
46     @Test
47     void publishEvent_shouldSuccessfullyPublishSingleMessage() {
48         //given
49         final Publisher publisher = new Publisher();
50         final String simpleEvent = "{\"message\":\"message1\"}";
51         final List<String> twoJsonMessages = List.of(simpleEvent);
52         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
53
54         //when
55         final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
56
57         //then
58         StepVerifier.create(result)
59                 .expectNext(expectedResponse)
60                 .expectComplete()
61                 .verify(Duration.ofSeconds(10));
62     }
63
64
65     private Option<PublisherConfig> createPublishConfig() {
66         List<String> desc = List.of("127.0.0.1:3904");
67         PublisherConfig conf = new PublisherConfig(desc, "topic");
68         return Option.of(conf);
69     }
70
71     private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
72         return ImmutableMessageRouterPublishResponse
73                 .builder()
74                 .items(items)
75                 .build();
76     }
77
78 }