2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.ArgumentMatchers.anyString;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.verifyNoMoreInteractions;
35 import ch.qos.logback.classic.spi.ILoggingEvent;
36 import ch.qos.logback.core.read.ListAppender;
38 import com.google.gson.Gson;
39 import com.google.gson.GsonBuilder;
40 import com.google.gson.JsonObject;
42 import java.io.IOException;
43 import java.nio.charset.Charset;
44 import java.util.Optional;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
50 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
51 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import org.springframework.http.HttpHeaders;
55 import org.springframework.http.HttpStatus;
56 import org.springframework.http.ResponseEntity;
57 import org.springframework.web.reactive.function.client.WebClientResponseException;
59 import reactor.core.publisher.Mono;
60 import reactor.test.StepVerifier;
62 class DmaapMessageHandlerTest {
63 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
64 private static final String URL = "url";
66 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
67 private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
68 private DmaapMessageHandler testedObject;
69 private static Gson gson = new GsonBuilder() //
73 private void setUp() throws Exception {
74 testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
77 static JsonObject payloadAsJson() {
78 return gson.fromJson(payloadAsString(), JsonObject.class);
81 static String payloadAsString() {
82 return "{\"param\":\"value\"}";
85 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
86 Optional<JsonObject> payload =
87 ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
89 return ImmutableDmaapRequestMessage.builder() //
90 .apiVersion("apiVersion") //
91 .correlationId("correlationId") //
92 .operation(operation) //
93 .originatorId("originatorId") //
95 .requestId("requestId") //
97 .timestamp("timestamp") //
102 private String dmaapInputMessage(Operation operation) {
103 return gson.toJson(dmaapRequestMessage(operation));
106 private Mono<ResponseEntity<String>> okResponse() {
107 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
108 return Mono.just(entity);
111 private Mono<ResponseEntity<String>> notOkResponse() {
112 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
113 return Mono.just(entity);
117 void testMessageParsing() {
118 String message = dmaapInputMessage(Operation.DELETE);
119 logger.info(message);
120 DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
121 assertNotNull(parsedMessage);
122 assertFalse(parsedMessage.payload().isPresent());
124 message = dmaapInputMessage(Operation.PUT);
125 logger.info(message);
126 parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
127 assertNotNull(parsedMessage);
128 assertTrue(parsedMessage.payload().isPresent());
132 void unparseableMessage_thenWarning() {
133 final ListAppender<ILoggingEvent> logAppender =
134 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
136 String msg = "bad message";
137 testedObject.handleDmaapMsg(msg);
139 assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
140 "handleDmaapMsg failure org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException: Received unparsable "
141 + "message from DMAAP: \"" + msg + "\", reason: ");
145 void successfulDelete() throws IOException {
146 doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
147 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
149 String message = dmaapInputMessage(Operation.DELETE);
152 .create(testedObject.createTask(message)) //
153 .expectSubscription() //
155 .verifyComplete(); //
157 verify(pmsClient).deleteForEntity(URL);
158 verifyNoMoreInteractions(pmsClient);
160 verify(dmaapClient).post(anyString(), anyString());
162 verifyNoMoreInteractions(dmaapClient);
166 void successfulGet() throws IOException {
167 doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
168 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
171 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
172 .expectSubscription() //
174 .verifyComplete(); //
176 verify(pmsClient).getForEntity(URL);
177 verifyNoMoreInteractions(pmsClient);
179 verify(dmaapClient).post(anyString(), anyString());
180 verifyNoMoreInteractions(dmaapClient);
184 void exceptionFromPmsWhenGet_thenPostError() throws IOException {
185 String errorBody = "Unavailable";
186 WebClientResponseException webClientResponseException = new WebClientResponseException(
187 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
188 doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
189 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
192 .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
193 .expectSubscription() //
194 .verifyComplete(); //
196 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
197 verify(dmaapClient).post(anyString(), captor.capture());
198 String actualMessage = captor.getValue();
199 assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
200 .contains(errorBody);
204 void successfulPut() throws IOException {
205 doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
206 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
209 .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
210 .expectSubscription() //
212 .verifyComplete(); //
214 verify(pmsClient).putForEntity(URL, payloadAsString());
215 verifyNoMoreInteractions(pmsClient);
217 verify(dmaapClient).post(anyString(), anyString());
218 verifyNoMoreInteractions(dmaapClient);
222 void successfulPost() throws IOException {
223 doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
224 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
227 .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
228 .expectSubscription() //
230 .verifyComplete(); //
232 verify(pmsClient).postForEntity(URL, payloadAsString());
233 verifyNoMoreInteractions(pmsClient);
235 verify(dmaapClient).post(anyString(), anyString());
236 verifyNoMoreInteractions(dmaapClient);
240 void exceptionWhenCallingPms_thenNotFoundResponse() throws IOException {
242 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
243 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
245 testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
247 verify(pmsClient).putForEntity(anyString(), anyString());
248 verifyNoMoreInteractions(pmsClient);
250 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
251 verify(dmaapClient).post(anyString(), captor.capture());
252 String actualMessage = captor.getValue();
253 assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
254 .contains(HttpStatus.BAD_GATEWAY.toString());
256 verifyNoMoreInteractions(dmaapClient);
260 void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
261 String message = dmaapInputMessage(Operation.PUT).toString();
262 String badOperation = "BAD";
263 message = message.replace(Operation.PUT.toString(), badOperation);
265 testedObject.handleDmaapMsg(message);
267 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
268 verify(dmaapClient).post(anyString(), captor.capture());
269 String actualMessage = captor.getValue();
270 assertThat(actualMessage).contains("Not implemented operation") //
271 .contains("BAD_REQUEST");
275 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
276 String message = dmaapInputMessage(Operation.PUT).toString();
277 message = message.replace("payload", "junk");
279 final ListAppender<ILoggingEvent> logAppender =
280 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
282 testedObject.handleDmaapMsg(message);
284 assertThat(logAppender.list.get(0).getFormattedMessage())
285 .startsWith("Expected payload in message from DMAAP: ");