[DCAEGEN2] Remove DMaaP dependency in VES-Collector
[dcaegen2/collectors/ves.git] / src / test / java / org / onap / dcae / common / publishing / PublisherTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VES Collector
4  * ================================================================================
5  * Copyright (C) 2021 Nokia. All rights reserved.
6  * Copyright (C) 2023 AT&T Intellectual Property. All rights reserved.
7  * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22 package org.onap.dcae.common.publishing;
23
24 import com.google.gson.JsonElement;
25 import io.vavr.collection.List;
26 import io.vavr.control.Option;
27 import org.junit.jupiter.api.Test;
28 import org.junit.jupiter.api.extension.ExtendWith;
29 import org.junit.Assume;
30 import org.junit.Before;
31
32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
34 import org.testcontainers.containers.DockerComposeContainer;
35 import org.testcontainers.junit.jupiter.Container;
36 import org.testcontainers.junit.jupiter.Testcontainers;
37 import reactor.core.publisher.Flux;
38 import reactor.test.StepVerifier;
39 import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
40 import uk.org.webcompere.systemstubs.jupiter.SystemStub;
41 import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
42
43 import java.time.Duration;
44
45 import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
46 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
47
48 @ExtendWith(SystemStubsExtension.class)
49 @Testcontainers(disabledWithoutDocker = true)
50 public class PublisherTest  {
51     
52     @SystemStub
53     EnvironmentVariables environmentVariables = new EnvironmentVariables();
54     
55     @Container
56     private final DockerComposeContainer CONTAINER = createContainerInstance();
57         
58     @Before
59     public void linuxOnly() {
60         Assume.assumeFalse
61         (System.getProperty("os.name").toLowerCase().startsWith("win"));
62     }
63
64     @Test
65     public void publishEvent_shouldSuccessfullyPublishSingleMessage() {
66         //given
67         environmentVariables
68         .set("BOOTSTRAP_SERVERS", "localhost:9092");
69         final Publisher publisher = new Publisher();
70         final String simpleEvent = "{\"message\":\"message1\"}";
71         final List<String> twoJsonMessages = List.of(simpleEvent);
72         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
73
74         //when
75         final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
76
77         //then
78         StepVerifier.create(result)
79                 .expectNext(expectedResponse)
80                 .expectComplete()
81                 .verify(Duration.ofSeconds(10));
82     }
83
84
85     private Option<PublisherConfig> createPublishConfig() {
86         List<String> desc = List.of("127.0.0.1:3904");
87         PublisherConfig conf = new PublisherConfig(desc, "topic");
88         return Option.of(conf);
89     }
90
91     private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
92         return ImmutableMessageRouterPublishResponse
93                 .builder()
94                 .items(items)
95                 .build();
96     }
97
98 }