2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import ch.qos.logback.classic.spi.ILoggingEvent;
33 import ch.qos.logback.core.read.ListAppender;
35 import com.google.gson.Gson;
36 import com.google.gson.GsonBuilder;
37 import com.google.gson.JsonObject;
39 import java.io.IOException;
40 import java.nio.charset.Charset;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.ArgumentCaptor;
46 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
47 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
48 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
49 import org.springframework.http.HttpHeaders;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.http.ResponseEntity;
52 import org.springframework.web.reactive.function.client.WebClientResponseException;
54 import reactor.core.publisher.Mono;
55 import reactor.test.StepVerifier;
57 class DmaapMessageHandlerTest {
58 private static final String URL = "url";
60 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
61 private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
62 private DmaapMessageHandler testedObject;
63 private Gson gson = new GsonBuilder().create(); //
66 private void setUp() throws Exception {
67 testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
70 JsonObject payloadAsJson() {
71 return gson.fromJson(payloadAsString(), JsonObject.class);
74 String payloadAsString() {
75 return "{\"param\":\"value\"}";
78 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
79 JsonObject payload = ((operation == Operation.PUT || operation == Operation.POST) ? payloadAsJson() : null);
80 return DmaapRequestMessage.builder() //
81 .apiVersion("apiVersion") //
82 .correlationId("correlationId") //
83 .operation(operation) //
84 .originatorId("originatorId") //
86 .requestId("requestId") //
88 .timestamp("timestamp") //
93 private Mono<ResponseEntity<String>> okResponse() {
94 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
95 return Mono.just(entity);
98 private Mono<ResponseEntity<String>> notOkResponse() {
99 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
100 return Mono.just(entity);
104 @DisplayName("test successful Delete")
105 void successfulDelete() throws IOException {
106 doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
107 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
109 DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
112 .create(testedObject.handleDmaapMsg(message)) //
113 .expectSubscription() //
115 .verifyComplete(); //
117 verify(pmsClient).deleteForEntity(URL);
118 verifyNoMoreInteractions(pmsClient);
120 verify(dmaapClient).post(anyString(), anyString());
122 verifyNoMoreInteractions(dmaapClient);
126 @DisplayName("test successful Get")
127 void successfulGet() throws IOException {
128 doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
129 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
131 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
133 .create(testedObject.handleDmaapMsg(message)) //
134 .expectSubscription() //
136 .verifyComplete(); //
138 verify(pmsClient).getForEntity(URL);
139 verifyNoMoreInteractions(pmsClient);
141 verify(dmaapClient).post(anyString(), anyString());
142 verifyNoMoreInteractions(dmaapClient);
146 @DisplayName("test exception From Pms When Get then Post Error")
147 void exceptionFromPmsWhenGet_thenPostError() throws IOException {
148 String errorBody = "Unavailable";
149 WebClientResponseException webClientResponseException = new WebClientResponseException(
150 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
151 doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
152 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
154 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
156 .create(testedObject.handleDmaapMsg(message)) //
157 .expectSubscription() //
158 .verifyComplete(); //
160 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
161 verify(dmaapClient).post(anyString(), captor.capture());
162 String actualMessage = captor.getValue();
163 assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
164 .contains(errorBody);
168 @DisplayName("test successful Put")
169 void successfulPut() throws IOException {
170 doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
171 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
173 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
175 .create(testedObject.handleDmaapMsg(message)) //
176 .expectSubscription() //
178 .verifyComplete(); //
180 verify(pmsClient).putForEntity(URL, payloadAsString());
181 verifyNoMoreInteractions(pmsClient);
183 verify(dmaapClient).post(anyString(), anyString());
184 verifyNoMoreInteractions(dmaapClient);
188 @DisplayName("test successful Post")
189 void successfulPost() throws IOException {
190 doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
191 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
193 DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
195 .create(testedObject.handleDmaapMsg(message)) //
196 .expectSubscription() //
198 .verifyComplete(); //
200 verify(pmsClient).postForEntity(URL, payloadAsString());
201 verifyNoMoreInteractions(pmsClient);
203 verify(dmaapClient).post(anyString(), anyString());
204 verifyNoMoreInteractions(dmaapClient);
208 @DisplayName("test exception When Calling Pms then Error Response")
209 void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
211 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
212 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
214 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
215 testedObject.handleDmaapMsg(message).block();
217 verify(pmsClient).putForEntity(anyString(), anyString());
218 verifyNoMoreInteractions(pmsClient);
220 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
221 verify(dmaapClient).post(anyString(), captor.capture());
222 String actualMessage = captor.getValue();
223 assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
224 .contains(HttpStatus.BAD_GATEWAY.toString());
226 verifyNoMoreInteractions(dmaapClient);
230 @DisplayName("test put Without Payload then Not Found Response With Warning")
231 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
232 DmaapRequestMessage message = DmaapRequestMessage.builder() //
233 .apiVersion("apiVersion") //
234 .correlationId("correlationId") //
235 .operation(DmaapRequestMessage.Operation.PUT) //
236 .originatorId("originatorId") //
238 .requestId("requestId") //
240 .timestamp("timestamp") //
244 final ListAppender<ILoggingEvent> logAppender =
245 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
247 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
248 testedObject.handleDmaapMsg(message).block();
250 assertThat(logAppender.list.get(0).getFormattedMessage())
251 .startsWith("Expected payload in message from DMAAP: ");