2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
23 import static ch.qos.logback.classic.Level.WARN;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import ch.qos.logback.classic.spi.ILoggingEvent;
33 import ch.qos.logback.core.read.ListAppender;
35 import com.google.gson.Gson;
36 import com.google.gson.GsonBuilder;
37 import com.google.gson.JsonObject;
39 import java.io.IOException;
40 import java.nio.charset.Charset;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.mockito.ArgumentCaptor;
45 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
46 import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
47 import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
48 import org.springframework.http.HttpHeaders;
49 import org.springframework.http.HttpStatus;
50 import org.springframework.http.ResponseEntity;
51 import org.springframework.web.reactive.function.client.WebClientResponseException;
53 import reactor.core.publisher.Mono;
54 import reactor.test.StepVerifier;
56 class DmaapMessageHandlerTest {
57 private static final String URL = "url";
59 private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
60 private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
61 private DmaapMessageHandler testedObject;
62 private Gson gson = new GsonBuilder().create(); //
65 private void setUp() throws Exception {
66 testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
69 JsonObject payloadAsJson() {
70 return gson.fromJson(payloadAsString(), JsonObject.class);
73 String payloadAsString() {
74 return "{\"param\":\"value\"}";
77 DmaapRequestMessage dmaapRequestMessage(Operation operation) {
78 JsonObject payload = ((operation == Operation.PUT || operation == Operation.POST) ? payloadAsJson() : null);
79 return DmaapRequestMessage.builder() //
80 .apiVersion("apiVersion") //
81 .correlationId("correlationId") //
82 .operation(operation) //
83 .originatorId("originatorId") //
85 .requestId("requestId") //
87 .timestamp("timestamp") //
92 private Mono<ResponseEntity<String>> okResponse() {
93 ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
94 return Mono.just(entity);
97 private Mono<ResponseEntity<String>> notOkResponse() {
98 ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
99 return Mono.just(entity);
103 void successfulDelete() throws IOException {
104 doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
105 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
107 DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
110 .create(testedObject.handleDmaapMsg(message)) //
111 .expectSubscription() //
113 .verifyComplete(); //
115 verify(pmsClient).deleteForEntity(URL);
116 verifyNoMoreInteractions(pmsClient);
118 verify(dmaapClient).post(anyString(), anyString());
120 verifyNoMoreInteractions(dmaapClient);
124 void successfulGet() throws IOException {
125 doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
126 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
128 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
130 .create(testedObject.handleDmaapMsg(message)) //
131 .expectSubscription() //
133 .verifyComplete(); //
135 verify(pmsClient).getForEntity(URL);
136 verifyNoMoreInteractions(pmsClient);
138 verify(dmaapClient).post(anyString(), anyString());
139 verifyNoMoreInteractions(dmaapClient);
143 void exceptionFromPmsWhenGet_thenPostError() throws IOException {
144 String errorBody = "Unavailable";
145 WebClientResponseException webClientResponseException = new WebClientResponseException(
146 HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
147 doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
148 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
150 DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
152 .create(testedObject.handleDmaapMsg(message)) //
153 .expectSubscription() //
154 .verifyComplete(); //
156 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
157 verify(dmaapClient).post(anyString(), captor.capture());
158 String actualMessage = captor.getValue();
159 assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
160 .contains(errorBody);
164 void successfulPut() throws IOException {
165 doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
166 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
168 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
170 .create(testedObject.handleDmaapMsg(message)) //
171 .expectSubscription() //
173 .verifyComplete(); //
175 verify(pmsClient).putForEntity(URL, payloadAsString());
176 verifyNoMoreInteractions(pmsClient);
178 verify(dmaapClient).post(anyString(), anyString());
179 verifyNoMoreInteractions(dmaapClient);
183 void successfulPost() throws IOException {
184 doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
185 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
187 DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
189 .create(testedObject.handleDmaapMsg(message)) //
190 .expectSubscription() //
192 .verifyComplete(); //
194 verify(pmsClient).postForEntity(URL, payloadAsString());
195 verifyNoMoreInteractions(pmsClient);
197 verify(dmaapClient).post(anyString(), anyString());
198 verifyNoMoreInteractions(dmaapClient);
202 void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
204 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
205 doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
207 DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
208 testedObject.handleDmaapMsg(message).block();
210 verify(pmsClient).putForEntity(anyString(), anyString());
211 verifyNoMoreInteractions(pmsClient);
213 ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
214 verify(dmaapClient).post(anyString(), captor.capture());
215 String actualMessage = captor.getValue();
216 assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
217 .contains(HttpStatus.BAD_GATEWAY.toString());
219 verifyNoMoreInteractions(dmaapClient);
223 void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
224 DmaapRequestMessage message = DmaapRequestMessage.builder() //
225 .apiVersion("apiVersion") //
226 .correlationId("correlationId") //
227 .operation(DmaapRequestMessage.Operation.PUT) //
228 .originatorId("originatorId") //
230 .requestId("requestId") //
232 .timestamp("timestamp") //
236 final ListAppender<ILoggingEvent> logAppender =
237 LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
239 doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
240 testedObject.handleDmaapMsg(message).block();
242 assertThat(logAppender.list.get(0).getFormattedMessage())
243 .startsWith("Expected payload in message from DMAAP: ");