2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.pipelines;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
40 import javax.net.ssl.SSLException;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
61 import org.onap.bbs.event.processor.model.PnfAaiObject;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
67 import org.springframework.http.HttpStatus;
68 import org.springframework.http.ResponseEntity;
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
75 @SuppressWarnings("unchecked")
76 @DisplayName("CPE Authentication Pipeline Unit-Tests")
77 class CpeAuthenticationPipelineTest {
79 private CpeAuthenticationPipeline pipeline;
80 private ApplicationConfiguration configuration;
81 private DmaapCpeAuthenticationConsumerTask consumerTask;
82 private DmaapPublisherTask publisherTask;
83 private AaiClientTask aaiClientTask;
85 private ResponseEntity<String> responseEntity;
90 responseEntity = Mockito.mock(ResponseEntity.class);
92 configuration = Mockito.mock(ApplicationConfiguration.class);
93 consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
94 publisherTask = Mockito.mock(DmaapPublisherTask.class);
95 aaiClientTask = Mockito.mock(AaiClientTask.class);
97 when(configuration.getCpeAuthenticationCloseLoopControlName())
98 .thenReturn("controlName");
99 when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
100 .thenReturn("policyScope");
102 pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
103 publisherTask, aaiClientTask, new HashMap<>());
107 void handleEmptyResponseFromDmaap() throws SSLException {
109 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
110 when(consumerTask.execute(anyString()))
111 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
113 StepVerifier.create(pipeline.executePipeline())
114 .expectSubscription()
117 verifyZeroInteractions(aaiClientTask);
118 verifyZeroInteractions(publisherTask);
122 void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
125 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
126 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
127 .thenReturn(Flux.never());
130 StepVerifier.create(pipeline.executePipeline())
131 .expectSubscription()
134 verifyZeroInteractions(aaiClientTask);
135 verifyZeroInteractions(publisherTask);
139 void noResponseFromAai_PipelineTimesOut() throws SSLException {
141 String pnfName = "olt1";
142 final String oldAuthenticationState = "outOfService";
143 final String newAuthenticationState = "inService";
144 final String stateInterface = "stateInterface";
145 final String rgwMacAddress = "00:0a:95:8d:78:16";
146 final String swVersion = "1.2";
148 // Prepare stubbed replies
149 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
150 .correlationId(pnfName)
151 .oldAuthenticationState(oldAuthenticationState)
152 .newAuthenticationState(newAuthenticationState)
153 .stateInterface(stateInterface)
154 .rgwMacAddress(rgwMacAddress)
155 .swVersion(swVersion)
159 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
160 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
161 when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
164 StepVerifier.create(pipeline.executePipeline())
165 .expectSubscription()
168 verifyZeroInteractions(publisherTask);
172 void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
174 String pnfName = "olt1";
175 final String oldAuthenticationState = "outOfService";
176 final String newAuthenticationState = "inService";
177 final String stateInterface = "stateInterface";
178 final String rgwMacAddress = "00:0a:95:8d:78:16";
179 final String swVersion = "1.2";
180 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
182 // Prepare stubbed replies
183 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
184 .correlationId(pnfName)
185 .oldAuthenticationState(oldAuthenticationState)
186 .newAuthenticationState(newAuthenticationState)
187 .stateInterface(stateInterface)
188 .rgwMacAddress(rgwMacAddress)
189 .swVersion(swVersion)
192 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
193 ServiceInstanceAaiObject hsiCfsServiceInstance =
194 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
197 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
198 hsiCfsServiceInstance.getServiceInstanceId());
200 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
201 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
203 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
204 .thenReturn(Mono.just(pnfAaiObject));
207 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
208 .thenReturn(Mono.just(hsiCfsServiceInstance));
210 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
212 // Execute the pipeline
213 StepVerifier.create(pipeline.executePipeline())
214 .expectSubscription()
217 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
221 void singleCorrectEvent_handleSuccessfully() throws SSLException {
223 String pnfName = "olt1";
224 final String oldAuthenticationState = "outOfService";
225 final String newAuthenticationState = "inService";
226 final String stateInterface = "stateInterface";
227 final String rgwMacAddress = "00:0a:95:8d:78:16";
228 final String swVersion = "1.2";
229 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
231 // Prepare stubbed replies
232 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
233 .correlationId(pnfName)
234 .oldAuthenticationState(oldAuthenticationState)
235 .newAuthenticationState(newAuthenticationState)
236 .stateInterface(stateInterface)
237 .rgwMacAddress(rgwMacAddress)
238 .swVersion(swVersion)
241 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
242 ServiceInstanceAaiObject hsiCfsServiceInstance =
243 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
246 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
247 hsiCfsServiceInstance.getServiceInstanceId());
249 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
250 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
252 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
253 .thenReturn(Mono.just(pnfAaiObject));
256 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
257 .thenReturn(Mono.just(hsiCfsServiceInstance));
259 when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
260 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
262 // Execute the pipeline
263 StepVerifier.create(pipeline.executePipeline())
264 .expectSubscription()
265 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
268 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
272 void twoCorrectEvents_handleSuccessfully() throws SSLException {
274 String pnfName1 = "olt1";
275 String pnfName2 = "olt2";
276 final String oldAuthenticationState = "outOfService";
277 final String newAuthenticationState = "inService";
278 final String stateInterface = "stateInterface";
279 final String rgwMacAddress1 = "00:0a:95:8d:78:16";
280 final String rgwMacAddress2 = "00:0a:95:8d:78:17";
281 final String swVersion = "1.2";
282 String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
283 String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
285 // Prepare stubbed replies
286 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
287 .correlationId(pnfName1)
288 .oldAuthenticationState(oldAuthenticationState)
289 .newAuthenticationState(newAuthenticationState)
290 .stateInterface(stateInterface)
291 .rgwMacAddress(rgwMacAddress1)
292 .swVersion(swVersion)
294 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
295 .correlationId(pnfName2)
296 .oldAuthenticationState(oldAuthenticationState)
297 .newAuthenticationState(newAuthenticationState)
298 .stateInterface(stateInterface)
299 .rgwMacAddress(rgwMacAddress2)
300 .swVersion(swVersion)
303 PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
304 PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
305 ServiceInstanceAaiObject hsiCfsServiceInstance1 =
306 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
307 ServiceInstanceAaiObject hsiCfsServiceInstance2 =
308 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
311 String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
312 String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
313 String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
314 hsiCfsServiceInstance1.getServiceInstanceId());
315 String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
316 hsiCfsServiceInstance2.getServiceInstanceId());
318 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
319 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
320 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
322 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
323 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
326 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
327 .thenReturn(Mono.just(hsiCfsServiceInstance1));
329 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
330 .thenReturn(Mono.just(hsiCfsServiceInstance2));
332 when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
333 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
335 // Execute the pipeline
336 StepVerifier.create(pipeline.executePipeline())
337 .expectSubscription()
338 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
339 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
342 verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
346 void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
348 String pnfName = "olt1";
349 final String oldAuthenticationState = "outOfService";
350 final String newAuthenticationState = "inService";
351 final String stateInterface = "stateInterface";
352 final String rgwMacAddress = "00:0a:95:8d:78:16";
353 final String swVersion = "1.2";
355 // Prepare stubbed replies
356 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
357 .correlationId(pnfName)
358 .oldAuthenticationState(oldAuthenticationState)
359 .newAuthenticationState(newAuthenticationState)
360 .stateInterface(stateInterface)
361 .rgwMacAddress(rgwMacAddress)
362 .swVersion(swVersion)
366 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
367 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
368 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
369 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
371 // Execute the pipeline
372 StepVerifier.create(pipeline.executePipeline())
373 .expectSubscription()
376 verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
377 verifyNoMoreInteractions(aaiClientTask);
378 verifyZeroInteractions(publisherTask);
382 void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
384 String pnfName1 = "olt1";
385 String pnfName2 = "olt2";
386 final String oldAuthenticationState = "outOfService";
387 final String newAuthenticationState = "inService";
388 final String stateInterface = "stateInterface";
389 final String rgwMacAddress1 = "00:0a:95:8d:78:16";
390 final String rgwMacAddress2 = "00:0a:95:8d:78:17";
391 final String swVersion = "1.2";
392 String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
393 String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
395 // Prepare stubbed replies
396 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
397 .correlationId(pnfName1)
398 .oldAuthenticationState(oldAuthenticationState)
399 .newAuthenticationState(newAuthenticationState)
400 .stateInterface(stateInterface)
401 .rgwMacAddress(rgwMacAddress1)
402 .swVersion(swVersion)
404 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
405 .correlationId(pnfName2)
406 .oldAuthenticationState(oldAuthenticationState)
407 .newAuthenticationState(newAuthenticationState)
408 .stateInterface(stateInterface)
409 .rgwMacAddress(rgwMacAddress2)
410 .swVersion(swVersion)
413 PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
414 PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
415 ServiceInstanceAaiObject hsiCfsServiceInstance1 =
416 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
417 ServiceInstanceAaiObject hsiCfsServiceInstance2 =
418 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
419 "Having unmatched RGW MAC address");
422 String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
423 String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
424 String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
425 hsiCfsServiceInstance1.getServiceInstanceId());
426 String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
427 hsiCfsServiceInstance2.getServiceInstanceId());
429 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
430 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
431 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
433 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
434 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
437 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
438 .thenReturn(Mono.just(hsiCfsServiceInstance1));
440 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
441 .thenReturn(Mono.just(hsiCfsServiceInstance2));
443 when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
444 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
446 // Execute the pipeline
447 StepVerifier.create(pipeline.executePipeline())
448 .expectSubscription()
449 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
452 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
456 void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
458 String pnfName1 = "olt1";
459 String pnfName2 = "olt2";
460 final String oldAuthenticationState = "outOfService";
461 final String newAuthenticationState = "inService";
462 final String stateInterface = "stateInterface";
463 final String rgwMacAddress = "00:0a:95:8d:78:16";
464 final String swVersion = "1.2";
465 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
467 // Prepare stubbed replies
468 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
469 .correlationId(pnfName1)
470 .oldAuthenticationState(oldAuthenticationState)
471 .newAuthenticationState(newAuthenticationState)
472 .stateInterface(stateInterface)
473 .rgwMacAddress(rgwMacAddress)
474 .swVersion(swVersion)
476 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
477 .correlationId(pnfName2)
478 .oldAuthenticationState(oldAuthenticationState)
479 .newAuthenticationState(newAuthenticationState)
480 .stateInterface(stateInterface)
481 .rgwMacAddress(rgwMacAddress)
482 .swVersion(swVersion)
485 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
486 ServiceInstanceAaiObject hsiCfsServiceInstance =
487 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
490 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
491 hsiCfsServiceInstance.getServiceInstanceId());
493 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
494 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
495 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
496 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
497 .thenReturn(Mono.just(pnfAaiObject))
498 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
500 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
501 .thenReturn(Mono.just(hsiCfsServiceInstance));
503 when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
504 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
506 // Execute the pipeline
507 StepVerifier.create(pipeline.executePipeline())
508 .expectSubscription()
509 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
512 verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
513 verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
514 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
518 void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
520 String pnfName1 = "olt1";
521 String pnfName2 = "olt2";
522 final String oldAuthenticationState = "outOfService";
523 final String newAuthenticationState = "inService";
524 final String stateInterface = "stateInterface";
525 final String rgwMacAddress = "00:0a:95:8d:78:16";
526 final String swVersion = "1.2";
527 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
529 // Prepare stubbed replies
530 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
531 .correlationId(pnfName1)
532 .oldAuthenticationState(oldAuthenticationState)
533 .newAuthenticationState(newAuthenticationState)
534 .stateInterface(stateInterface)
535 .rgwMacAddress(rgwMacAddress)
536 .swVersion(swVersion)
538 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
539 .correlationId(pnfName2)
540 .oldAuthenticationState(oldAuthenticationState)
541 .newAuthenticationState(newAuthenticationState)
542 .stateInterface(stateInterface)
543 .rgwMacAddress(rgwMacAddress)
544 .swVersion(swVersion)
547 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
548 ServiceInstanceAaiObject hsiCfsServiceInstance =
549 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
552 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
553 hsiCfsServiceInstance.getServiceInstanceId());
555 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
556 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
557 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
558 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
559 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
560 .thenReturn(Mono.just(pnfAaiObject));
562 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
563 .thenReturn(Mono.just(hsiCfsServiceInstance));
565 when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
566 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
568 // Execute the pipeline
569 StepVerifier.create(pipeline.executePipeline())
570 .expectSubscription()
571 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
574 verify(aaiClientTask, times(2))
575 .executePnfRetrieval(anyString(), anyString());
576 verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
577 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
580 private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
582 // Build Relationship Data
583 RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
584 ImmutableRelationshipEntryAaiObject.builder()
585 .relatedTo("service-instance")
586 .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
587 + "/service-subscription/BBS-CFS/service-instances"
588 + "/service-instance/" + hsiCfsServiceInstanceId)
589 .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
590 .relationshipData(Arrays.asList(
591 ImmutableRelationshipDataEntryAaiObject.builder()
592 .relationshipKey("customer.global-customer-id")
593 .relationshipValue("Demonstration").build(),
594 ImmutableRelationshipDataEntryAaiObject.builder()
595 .relationshipKey("service-subscription.service-type")
596 .relationshipValue("BBS-CFS").build(),
597 ImmutableRelationshipDataEntryAaiObject.builder()
598 .relationshipKey("service-instance.service-instance-id")
599 .relationshipValue(hsiCfsServiceInstanceId).build())
601 .relatedToProperties(Collections.singletonList(
602 ImmutablePropertyAaiObject.builder()
603 .propertyKey("service-instance.service-instance-name")
604 .propertyValue("bbs-instance").build())
608 RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
609 .relationshipEntries(Collections.singletonList(relationshipEntry))
612 // Finally construct PNF object data
613 return ImmutablePnfAaiObject.builder()
615 .isInMaintenance(true)
616 .relationshipListAaiObject(relationshipListAaiObject)
620 private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
622 String rgwMacAddress) {
623 String orchestrationStatus = "active";
625 RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
626 ImmutableRelationshipEntryAaiObject.builder()
628 .relatedLink("/pnfs/pnf/" + pnfName)
629 .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
630 .relationshipKey("pnf.pnf-name")
631 .relationshipValue(pnfName).build()))
634 RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
635 .relationshipEntries(Collections.singletonList(relationshipEntry))
638 MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
639 ImmutableMetadataEntryAaiObject.builder()
640 .metaname("rgw-mac-address")
641 .metavalue(rgwMacAddress)
644 MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
645 .metadataEntries(Collections.singletonList(metadataEntry))
648 // Finally construct Service Instance object data
649 return ImmutableServiceInstanceAaiObject.builder()
650 .serviceInstanceId(hsiCfsServiceInstanceId)
651 .orchestrationStatus(orchestrationStatus)
652 .relationshipListAaiObject(relationshipListAaiObject)
653 .metadataListAaiObject(metadataListAaiObject)