2 * ============LICENSE_START=======================================================
3 * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.bbs.event.processor.pipelines;
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
40 import javax.net.ssl.SSLException;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
61 import org.onap.bbs.event.processor.model.PnfAaiObject;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
67 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
68 import org.springframework.http.HttpStatus;
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
74 @DisplayName("CPE Authentication Pipeline Unit-Tests")
75 class CpeAuthenticationPipelineTest {
77 private CpeAuthenticationPipeline pipeline;
78 private ApplicationConfiguration configuration;
79 private DmaapCpeAuthenticationConsumerTask consumerTask;
80 private DmaapPublisherTask publisherTask;
81 private AaiClientTask aaiClientTask;
83 private HttpResponse httpResponse;
88 httpResponse = Mockito.mock(HttpResponse.class);
90 configuration = Mockito.mock(ApplicationConfiguration.class);
91 consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
92 publisherTask = Mockito.mock(DmaapPublisherTask.class);
93 aaiClientTask = Mockito.mock(AaiClientTask.class);
95 when(configuration.getCpeAuthenticationCloseLoopControlName())
96 .thenReturn("controlName");
97 when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
98 .thenReturn("policyScope");
99 when(configuration.getPolicyVersion())
100 .thenReturn("1.0.0");
101 when(configuration.getCloseLoopTargetType())
103 when(configuration.getCloseLoopEventStatus())
104 .thenReturn("ONSET");
105 when(configuration.getCloseLoopVersion())
106 .thenReturn("1.0.2");
107 when(configuration.getCloseLoopTarget())
108 .thenReturn("CL-Target");
109 when(configuration.getCloseLoopOriginator())
110 .thenReturn("DCAE-BBS-ep");
112 pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
113 publisherTask, aaiClientTask, new HashMap<>());
117 void handleEmptyResponseFromDmaap() throws SSLException {
119 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
120 when(consumerTask.execute(anyString()))
121 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
123 StepVerifier.create(pipeline.executePipeline())
124 .expectSubscription()
127 verifyZeroInteractions(aaiClientTask);
128 verifyZeroInteractions(publisherTask);
132 void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
135 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
136 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
137 .thenReturn(Flux.never());
140 StepVerifier.create(pipeline.executePipeline())
141 .expectSubscription()
144 verifyZeroInteractions(aaiClientTask);
145 verifyZeroInteractions(publisherTask);
149 void noResponseFromAai_PipelineTimesOut() throws SSLException {
151 String pnfName = "olt1";
152 final String oldAuthenticationState = "outOfService";
153 final String newAuthenticationState = "inService";
154 final String stateInterface = "stateInterface";
155 final String rgwMacAddress = "00:0a:95:8d:78:16";
156 final String swVersion = "1.2";
158 // Prepare stubbed replies
159 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
160 .correlationId(pnfName)
161 .oldAuthenticationState(oldAuthenticationState)
162 .newAuthenticationState(newAuthenticationState)
163 .stateInterface(stateInterface)
164 .rgwMacAddress(rgwMacAddress)
165 .swVersion(swVersion)
169 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
170 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
171 when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
174 StepVerifier.create(pipeline.executePipeline())
175 .expectSubscription()
178 verifyZeroInteractions(publisherTask);
182 void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
184 String pnfName = "olt1";
185 final String oldAuthenticationState = "outOfService";
186 final String newAuthenticationState = "inService";
187 final String stateInterface = "stateInterface";
188 final String rgwMacAddress = "00:0a:95:8d:78:16";
189 final String swVersion = "1.2";
190 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
192 // Prepare stubbed replies
193 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
194 .correlationId(pnfName)
195 .oldAuthenticationState(oldAuthenticationState)
196 .newAuthenticationState(newAuthenticationState)
197 .stateInterface(stateInterface)
198 .rgwMacAddress(rgwMacAddress)
199 .swVersion(swVersion)
202 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
203 ServiceInstanceAaiObject hsiCfsServiceInstance =
204 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
207 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
208 hsiCfsServiceInstance.getServiceInstanceId());
210 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
211 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
213 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
214 .thenReturn(Mono.just(pnfAaiObject));
217 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
218 .thenReturn(Mono.just(hsiCfsServiceInstance));
220 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
222 // Execute the pipeline
223 StepVerifier.create(pipeline.executePipeline())
224 .expectSubscription()
227 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
231 void singleCorrectEvent_handleSuccessfully() throws SSLException {
233 String pnfName = "olt1";
234 final String oldAuthenticationState = "outOfService";
235 final String newAuthenticationState = "inService";
236 final String stateInterface = "stateInterface";
237 final String rgwMacAddress = "00:0a:95:8d:78:16";
238 final String swVersion = "1.2";
239 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
241 // Prepare stubbed replies
242 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
243 .correlationId(pnfName)
244 .oldAuthenticationState(oldAuthenticationState)
245 .newAuthenticationState(newAuthenticationState)
246 .stateInterface(stateInterface)
247 .rgwMacAddress(rgwMacAddress)
248 .swVersion(swVersion)
251 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
252 ServiceInstanceAaiObject hsiCfsServiceInstance =
253 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
256 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
257 hsiCfsServiceInstance.getServiceInstanceId());
259 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
260 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
262 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
263 .thenReturn(Mono.just(pnfAaiObject));
266 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
267 .thenReturn(Mono.just(hsiCfsServiceInstance));
269 when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
270 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
272 // Execute the pipeline
273 StepVerifier.create(pipeline.executePipeline())
274 .expectSubscription()
275 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
278 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
282 void twoCorrectEvents_handleSuccessfully() throws SSLException {
284 String pnfName1 = "olt1";
285 String pnfName2 = "olt2";
286 final String oldAuthenticationState = "outOfService";
287 final String newAuthenticationState = "inService";
288 final String stateInterface = "stateInterface";
289 final String rgwMacAddress1 = "00:0a:95:8d:78:16";
290 final String rgwMacAddress2 = "00:0a:95:8d:78:17";
291 final String swVersion = "1.2";
292 String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
293 String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
295 // Prepare stubbed replies
296 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
297 .correlationId(pnfName1)
298 .oldAuthenticationState(oldAuthenticationState)
299 .newAuthenticationState(newAuthenticationState)
300 .stateInterface(stateInterface)
301 .rgwMacAddress(rgwMacAddress1)
302 .swVersion(swVersion)
304 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
305 .correlationId(pnfName2)
306 .oldAuthenticationState(oldAuthenticationState)
307 .newAuthenticationState(newAuthenticationState)
308 .stateInterface(stateInterface)
309 .rgwMacAddress(rgwMacAddress2)
310 .swVersion(swVersion)
313 PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
314 PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
315 ServiceInstanceAaiObject hsiCfsServiceInstance1 =
316 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
317 ServiceInstanceAaiObject hsiCfsServiceInstance2 =
318 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
321 String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
322 String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
323 String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
324 hsiCfsServiceInstance1.getServiceInstanceId());
325 String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
326 hsiCfsServiceInstance2.getServiceInstanceId());
328 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
329 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
330 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
332 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
333 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
336 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
337 .thenReturn(Mono.just(hsiCfsServiceInstance1));
339 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
340 .thenReturn(Mono.just(hsiCfsServiceInstance2));
342 when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
343 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
345 // Execute the pipeline
346 StepVerifier.create(pipeline.executePipeline())
347 .expectSubscription()
348 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
349 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
352 verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
356 void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
358 String pnfName = "olt1";
359 final String oldAuthenticationState = "outOfService";
360 final String newAuthenticationState = "inService";
361 final String stateInterface = "stateInterface";
362 final String rgwMacAddress = "00:0a:95:8d:78:16";
363 final String swVersion = "1.2";
365 // Prepare stubbed replies
366 CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
367 .correlationId(pnfName)
368 .oldAuthenticationState(oldAuthenticationState)
369 .newAuthenticationState(newAuthenticationState)
370 .stateInterface(stateInterface)
371 .rgwMacAddress(rgwMacAddress)
372 .swVersion(swVersion)
376 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
377 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
378 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
379 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
381 // Execute the pipeline
382 StepVerifier.create(pipeline.executePipeline())
383 .expectSubscription()
386 verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
387 verifyNoMoreInteractions(aaiClientTask);
388 verifyZeroInteractions(publisherTask);
392 void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
394 String pnfName1 = "olt1";
395 String pnfName2 = "olt2";
396 final String oldAuthenticationState = "outOfService";
397 final String newAuthenticationState = "inService";
398 final String stateInterface = "stateInterface";
399 final String rgwMacAddress1 = "00:0a:95:8d:78:16";
400 final String rgwMacAddress2 = "00:0a:95:8d:78:17";
401 final String swVersion = "1.2";
402 String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
403 String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
405 // Prepare stubbed replies
406 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
407 .correlationId(pnfName1)
408 .oldAuthenticationState(oldAuthenticationState)
409 .newAuthenticationState(newAuthenticationState)
410 .stateInterface(stateInterface)
411 .rgwMacAddress(rgwMacAddress1)
412 .swVersion(swVersion)
414 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
415 .correlationId(pnfName2)
416 .oldAuthenticationState(oldAuthenticationState)
417 .newAuthenticationState(newAuthenticationState)
418 .stateInterface(stateInterface)
419 .rgwMacAddress(rgwMacAddress2)
420 .swVersion(swVersion)
423 PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
424 PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
425 ServiceInstanceAaiObject hsiCfsServiceInstance1 =
426 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
427 ServiceInstanceAaiObject hsiCfsServiceInstance2 =
428 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
429 "Having unmatched RGW MAC address");
432 String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
433 String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
434 String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
435 hsiCfsServiceInstance1.getServiceInstanceId());
436 String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
437 hsiCfsServiceInstance2.getServiceInstanceId());
439 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
440 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
441 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
443 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
444 when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
447 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
448 .thenReturn(Mono.just(hsiCfsServiceInstance1));
450 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
451 .thenReturn(Mono.just(hsiCfsServiceInstance2));
453 when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
454 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
456 // Execute the pipeline
457 StepVerifier.create(pipeline.executePipeline())
458 .expectSubscription()
459 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
462 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
466 void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
468 String pnfName1 = "olt1";
469 String pnfName2 = "olt2";
470 final String oldAuthenticationState = "outOfService";
471 final String newAuthenticationState = "inService";
472 final String stateInterface = "stateInterface";
473 final String rgwMacAddress = "00:0a:95:8d:78:16";
474 final String swVersion = "1.2";
475 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
477 // Prepare stubbed replies
478 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
479 .correlationId(pnfName1)
480 .oldAuthenticationState(oldAuthenticationState)
481 .newAuthenticationState(newAuthenticationState)
482 .stateInterface(stateInterface)
483 .rgwMacAddress(rgwMacAddress)
484 .swVersion(swVersion)
486 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
487 .correlationId(pnfName2)
488 .oldAuthenticationState(oldAuthenticationState)
489 .newAuthenticationState(newAuthenticationState)
490 .stateInterface(stateInterface)
491 .rgwMacAddress(rgwMacAddress)
492 .swVersion(swVersion)
495 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
496 ServiceInstanceAaiObject hsiCfsServiceInstance =
497 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
500 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
501 hsiCfsServiceInstance.getServiceInstanceId());
503 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
504 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
505 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
506 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
507 .thenReturn(Mono.just(pnfAaiObject))
508 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
510 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
511 .thenReturn(Mono.just(hsiCfsServiceInstance));
513 when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
514 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
516 // Execute the pipeline
517 StepVerifier.create(pipeline.executePipeline())
518 .expectSubscription()
519 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
522 verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
523 verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
524 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
528 void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
530 String pnfName1 = "olt1";
531 String pnfName2 = "olt2";
532 final String oldAuthenticationState = "outOfService";
533 final String newAuthenticationState = "inService";
534 final String stateInterface = "stateInterface";
535 final String rgwMacAddress = "00:0a:95:8d:78:16";
536 final String swVersion = "1.2";
537 String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
539 // Prepare stubbed replies
540 CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
541 .correlationId(pnfName1)
542 .oldAuthenticationState(oldAuthenticationState)
543 .newAuthenticationState(newAuthenticationState)
544 .stateInterface(stateInterface)
545 .rgwMacAddress(rgwMacAddress)
546 .swVersion(swVersion)
548 CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
549 .correlationId(pnfName2)
550 .oldAuthenticationState(oldAuthenticationState)
551 .newAuthenticationState(newAuthenticationState)
552 .stateInterface(stateInterface)
553 .rgwMacAddress(rgwMacAddress)
554 .swVersion(swVersion)
557 PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
558 ServiceInstanceAaiObject hsiCfsServiceInstance =
559 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
562 String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
563 hsiCfsServiceInstance.getServiceInstanceId());
565 when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
566 when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
567 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
568 when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
569 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
570 .thenReturn(Mono.just(pnfAaiObject));
572 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
573 .thenReturn(Mono.just(hsiCfsServiceInstance));
575 when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
576 when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
578 // Execute the pipeline
579 StepVerifier.create(pipeline.executePipeline())
580 .expectSubscription()
581 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
584 verify(aaiClientTask, times(2))
585 .executePnfRetrieval(anyString(), anyString());
586 verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
587 verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
590 private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
592 // Build Relationship Data
593 RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
594 ImmutableRelationshipEntryAaiObject.builder()
595 .relatedTo("service-instance")
596 .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
597 + "/service-subscription/BBS-CFS/service-instances"
598 + "/service-instance/" + hsiCfsServiceInstanceId)
599 .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
600 .relationshipData(Arrays.asList(
601 ImmutableRelationshipDataEntryAaiObject.builder()
602 .relationshipKey("customer.global-customer-id")
603 .relationshipValue("Demonstration").build(),
604 ImmutableRelationshipDataEntryAaiObject.builder()
605 .relationshipKey("service-subscription.service-type")
606 .relationshipValue("BBS-CFS").build(),
607 ImmutableRelationshipDataEntryAaiObject.builder()
608 .relationshipKey("service-instance.service-instance-id")
609 .relationshipValue(hsiCfsServiceInstanceId).build())
611 .relatedToProperties(Collections.singletonList(
612 ImmutablePropertyAaiObject.builder()
613 .propertyKey("service-instance.service-instance-name")
614 .propertyValue("bbs-instance").build())
618 RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
619 .relationshipEntries(Collections.singletonList(relationshipEntry))
622 // Finally construct PNF object data
623 return ImmutablePnfAaiObject.builder()
625 .isInMaintenance(true)
626 .relationshipListAaiObject(relationshipListAaiObject)
630 private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
632 String rgwMacAddress) {
633 String orchestrationStatus = "active";
635 RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
636 ImmutableRelationshipEntryAaiObject.builder()
638 .relatedLink("/pnfs/pnf/" + pnfName)
639 .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
640 .relationshipKey("pnf.pnf-name")
641 .relationshipValue(pnfName).build()))
644 RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
645 .relationshipEntries(Collections.singletonList(relationshipEntry))
648 MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
649 ImmutableMetadataEntryAaiObject.builder()
650 .metaname("rgw-mac-address")
651 .metavalue(rgwMacAddress)
654 MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
655 .metadataEntries(Collections.singletonList(metadataEntry))
658 // Finally construct Service Instance object data
659 return ImmutableServiceInstanceAaiObject.builder()
660 .serviceInstanceId(hsiCfsServiceInstanceId)
661 .orchestrationStatus(orchestrationStatus)
662 .relationshipListAaiObject(relationshipListAaiObject)
663 .metadataListAaiObject(metadataListAaiObject)