2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import ch.qos.logback.classic.spi.ILoggingEvent;
33 import ch.qos.logback.core.read.ListAppender;
35 import com.google.gson.Gson;
36 import com.google.gson.GsonBuilder;
37 import com.google.gson.JsonObject;
39 import java.io.IOException;
40 import java.nio.charset.Charset;
41 import java.util.Optional;
43 import org.junit.jupiter.api.BeforeEach;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.ArgumentCaptor;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
47 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
48 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.springframework.http.HttpHeaders;
52 import org.springframework.http.HttpStatus;
53 import org.springframework.http.ResponseEntity;
54 import org.springframework.web.reactive.function.client.WebClientResponseException;
56 import reactor.core.publisher.Mono;
57 import reactor.test.StepVerifier;
59 class DmaapMessageHandlerTest {
60 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
61 private static final String URL = "url";
63 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
64 private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
65 private DmaapMessageHandler testedObject;
66 private Gson gson = new GsonBuilder().create(); //
69 private void setUp() throws Exception {
70 testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
73 JsonObject payloadAsJson() {
74 return gson.fromJson(payloadAsString(), JsonObject.class);
77 String payloadAsString() {
78 return "{\"param\":\"value\"}";
81 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
82 Optional<JsonObject> payload =
83 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
85 return ImmutableDmaapRequestMessage.builder() //
86 .apiVersion("apiVersion") //
87 .correlationId("correlationId") //
88 .operation(operation) //
89 .originatorId("originatorId") //
91 .requestId("requestId") //
93 .timestamp("timestamp") //
98 private Mono<ResponseEntity<String>> okResponse() {
99 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
100 return Mono.just(entity);
103 private Mono<ResponseEntity<String>> notOkResponse() {
104 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
105 return Mono.just(entity);
109 void successfulDelete() throws IOException {
110 doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
111 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
113 DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
116 .create(testedObject.createTask(message)) //
117 .expectSubscription() //
119 .verifyComplete(); //
121 verify(pmsClient).deleteForEntity(URL);
122 verifyNoMoreInteractions(pmsClient);
124 verify(dmaapClient).post(anyString(), anyString());
126 verifyNoMoreInteractions(dmaapClient);
130 void successfulGet() throws IOException {
131 doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
132 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
134 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
136 .create(testedObject.createTask(message)) //
137 .expectSubscription() //
139 .verifyComplete(); //
141 verify(pmsClient).getForEntity(URL);
142 verifyNoMoreInteractions(pmsClient);
144 verify(dmaapClient).post(anyString(), anyString());
145 verifyNoMoreInteractions(dmaapClient);
149 void exceptionFromPmsWhenGet_thenPostError() throws IOException {
150 String errorBody = "Unavailable";
151 WebClientResponseException webClientResponseException = new WebClientResponseException(
152 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
153 doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
154 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
156 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
158 .create(testedObject.createTask(message)) //
159 .expectSubscription() //
160 .verifyComplete(); //
162 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
163 verify(dmaapClient).post(anyString(), captor.capture());
164 String actualMessage = captor.getValue();
165 assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
166 .contains(errorBody);
170 void successfulPut() throws IOException {
171 doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
172 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
174 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
176 .create(testedObject.createTask(message)) //
177 .expectSubscription() //
179 .verifyComplete(); //
181 verify(pmsClient).putForEntity(URL, payloadAsString());
182 verifyNoMoreInteractions(pmsClient);
184 verify(dmaapClient).post(anyString(), anyString());
185 verifyNoMoreInteractions(dmaapClient);
189 void successfulPost() throws IOException {
190 doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
191 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
193 DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
195 .create(testedObject.createTask(message)) //
196 .expectSubscription() //
198 .verifyComplete(); //
200 verify(pmsClient).postForEntity(URL, payloadAsString());
201 verifyNoMoreInteractions(pmsClient);
203 verify(dmaapClient).post(anyString(), anyString());
204 verifyNoMoreInteractions(dmaapClient);
208 void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
210 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
211 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
213 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
214 testedObject.createTask(message).block();
216 verify(pmsClient).putForEntity(anyString(), anyString());
217 verifyNoMoreInteractions(pmsClient);
219 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
220 verify(dmaapClient).post(anyString(), captor.capture());
221 String actualMessage = captor.getValue();
222 assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
223 .contains(HttpStatus.BAD_GATEWAY.toString());
225 verifyNoMoreInteractions(dmaapClient);
229 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
230 DmaapRequestMessage message = ImmutableDmaapRequestMessage.builder() //
231 .apiVersion("apiVersion") //
232 .correlationId("correlationId") //
233 .operation(DmaapRequestMessage.Operation.PUT) //
234 .originatorId("originatorId") //
235 .payload(Optional.empty()) //
236 .requestId("requestId") //
238 .timestamp("timestamp") //
242 final ListAppender<ILoggingEvent> logAppender =
243 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
245 testedObject.handleDmaapMsg(message);
247 assertThat(logAppender.list.get(0).getFormattedMessage())
248 .startsWith("Expected payload in message from DMAAP: ");