2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2021 Nokia. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.dcae.common.publishing;
22 import com.google.gson.JsonElement;
23 import io.vavr.collection.List;
24 import io.vavr.control.Option;
25 import org.junit.jupiter.api.Test;
26 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
27 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
28 import org.testcontainers.containers.DockerComposeContainer;
29 import org.testcontainers.junit.jupiter.Container;
30 import org.testcontainers.junit.jupiter.Testcontainers;
31 import reactor.core.publisher.Flux;
32 import reactor.test.StepVerifier;
34 import java.time.Duration;
36 import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
37 import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
41 public class PublisherTest {
44 private final DockerComposeContainer CONTAINER = createContainerInstance();
47 void publishEvent_shouldSuccessfullyPublishSingleMessage() {
49 final Publisher publisher = new Publisher();
50 final String simpleEvent = "{\"message\":\"message1\"}";
51 final List<String> twoJsonMessages = List.of(simpleEvent);
52 final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
55 final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
58 StepVerifier.create(result)
59 .expectNext(expectedResponse)
61 .verify(Duration.ofSeconds(10));
65 private Option<PublisherConfig> createPublishConfig() {
66 List<String> desc = List.of("127.0.0.1:3904");
67 PublisherConfig conf = new PublisherConfig(desc, "topic");
68 return Option.of(conf);
71 private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
72 return ImmutableMessageRouterPublishResponse