76d9659cecbfd9216e3c46fb55b3893877a5826d
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.pipelines;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
39
40 import javax.net.ssl.SSLException;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
61 import org.onap.bbs.event.processor.model.PnfAaiObject;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
67 import org.springframework.http.HttpStatus;
68 import org.springframework.http.ResponseEntity;
69
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
73
74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
75 @SuppressWarnings("unchecked")
76 @DisplayName("CPE Authentication Pipeline Unit-Tests")
77 class CpeAuthenticationPipelineTest {
78
79     private CpeAuthenticationPipeline pipeline;
80     private ApplicationConfiguration configuration;
81     private DmaapCpeAuthenticationConsumerTask consumerTask;
82     private DmaapPublisherTask publisherTask;
83     private AaiClientTask aaiClientTask;
84
85     private ResponseEntity<String> responseEntity;
86
87     @BeforeEach
88     void setup() {
89
90         responseEntity = Mockito.mock(ResponseEntity.class);
91
92         configuration = Mockito.mock(ApplicationConfiguration.class);
93         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
95         aaiClientTask = Mockito.mock(AaiClientTask.class);
96
97         when(configuration.getCpeAuthenticationCloseLoopControlName())
98                 .thenReturn("controlName");
99         when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
100                 .thenReturn("policyScope");
101         when(configuration.getPolicyVersion())
102                 .thenReturn("1.0.0");
103         when(configuration.getCloseLoopTargetType())
104                 .thenReturn("VM");
105         when(configuration.getCloseLoopEventStatus())
106                 .thenReturn("ONSET");
107         when(configuration.getCloseLoopVersion())
108                 .thenReturn("1.0.2");
109         when(configuration.getCloseLoopTarget())
110                 .thenReturn("CL-Target");
111         when(configuration.getCloseLoopOriginator())
112                 .thenReturn("DCAE-BBS-ep");
113
114         pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
115                 publisherTask, aaiClientTask, new HashMap<>());
116     }
117
118     @Test
119     void handleEmptyResponseFromDmaap() throws SSLException {
120
121         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
122         when(consumerTask.execute(anyString()))
123                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
124
125         StepVerifier.create(pipeline.executePipeline())
126                 .expectSubscription()
127                 .verifyComplete();
128
129         verifyZeroInteractions(aaiClientTask);
130         verifyZeroInteractions(publisherTask);
131     }
132
133     @Test
134     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
135
136         // Prepare mocks
137         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
138         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
139                 .thenReturn(Flux.never());
140
141         // Execute pipeline
142         StepVerifier.create(pipeline.executePipeline())
143                 .expectSubscription()
144                 .verifyComplete();
145
146         verifyZeroInteractions(aaiClientTask);
147         verifyZeroInteractions(publisherTask);
148     }
149
150     @Test
151     void noResponseFromAai_PipelineTimesOut() throws SSLException {
152
153         String pnfName = "olt1";
154         final String oldAuthenticationState = "outOfService";
155         final String newAuthenticationState = "inService";
156         final String stateInterface = "stateInterface";
157         final String rgwMacAddress = "00:0a:95:8d:78:16";
158         final String swVersion = "1.2";
159
160         // Prepare stubbed replies
161         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
162                 .correlationId(pnfName)
163                 .oldAuthenticationState(oldAuthenticationState)
164                 .newAuthenticationState(newAuthenticationState)
165                 .stateInterface(stateInterface)
166                 .rgwMacAddress(rgwMacAddress)
167                 .swVersion(swVersion)
168                 .build();
169
170         // Prepare mocks
171         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
172         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
173         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
174
175         // Execute pipeline
176         StepVerifier.create(pipeline.executePipeline())
177                 .expectSubscription()
178                 .verifyComplete();
179
180         verifyZeroInteractions(publisherTask);
181     }
182
183     @Test
184     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
185
186         String pnfName = "olt1";
187         final String oldAuthenticationState = "outOfService";
188         final String newAuthenticationState = "inService";
189         final String stateInterface = "stateInterface";
190         final String rgwMacAddress = "00:0a:95:8d:78:16";
191         final String swVersion = "1.2";
192         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
193
194         // Prepare stubbed replies
195         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
196                 .correlationId(pnfName)
197                 .oldAuthenticationState(oldAuthenticationState)
198                 .newAuthenticationState(newAuthenticationState)
199                 .stateInterface(stateInterface)
200                 .rgwMacAddress(rgwMacAddress)
201                 .swVersion(swVersion)
202                 .build();
203
204         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
205         ServiceInstanceAaiObject hsiCfsServiceInstance =
206                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
207
208         // Prepare Mocks
209         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
210                 hsiCfsServiceInstance.getServiceInstanceId());
211
212         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
213         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
214
215         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
216                 .thenReturn(Mono.just(pnfAaiObject));
217
218         when(aaiClientTask
219                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
220                 .thenReturn(Mono.just(hsiCfsServiceInstance));
221
222         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
223
224         // Execute the pipeline
225         StepVerifier.create(pipeline.executePipeline())
226                 .expectSubscription()
227                 .verifyComplete();
228
229         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
230     }
231
232     @Test
233     void singleCorrectEvent_handleSuccessfully() throws SSLException {
234
235         String pnfName = "olt1";
236         final String oldAuthenticationState = "outOfService";
237         final String newAuthenticationState = "inService";
238         final String stateInterface = "stateInterface";
239         final String rgwMacAddress = "00:0a:95:8d:78:16";
240         final String swVersion = "1.2";
241         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
242
243         // Prepare stubbed replies
244         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
245                 .correlationId(pnfName)
246                 .oldAuthenticationState(oldAuthenticationState)
247                 .newAuthenticationState(newAuthenticationState)
248                 .stateInterface(stateInterface)
249                 .rgwMacAddress(rgwMacAddress)
250                 .swVersion(swVersion)
251                 .build();
252
253         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
254         ServiceInstanceAaiObject hsiCfsServiceInstance =
255                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
256
257         // Prepare Mocks
258         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
259                 hsiCfsServiceInstance.getServiceInstanceId());
260
261         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
262         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
263
264         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
265                 .thenReturn(Mono.just(pnfAaiObject));
266
267         when(aaiClientTask
268                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
269                 .thenReturn(Mono.just(hsiCfsServiceInstance));
270
271         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
272         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
273
274         // Execute the pipeline
275         StepVerifier.create(pipeline.executePipeline())
276                 .expectSubscription()
277                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
278                 .verifyComplete();
279
280         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
281     }
282
283     @Test
284     void twoCorrectEvents_handleSuccessfully() throws SSLException {
285
286         String pnfName1 = "olt1";
287         String pnfName2 = "olt2";
288         final String oldAuthenticationState = "outOfService";
289         final String newAuthenticationState = "inService";
290         final String stateInterface = "stateInterface";
291         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
292         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
293         final String swVersion = "1.2";
294         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
295         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
296
297         // Prepare stubbed replies
298         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
299                 .correlationId(pnfName1)
300                 .oldAuthenticationState(oldAuthenticationState)
301                 .newAuthenticationState(newAuthenticationState)
302                 .stateInterface(stateInterface)
303                 .rgwMacAddress(rgwMacAddress1)
304                 .swVersion(swVersion)
305                 .build();
306         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
307                 .correlationId(pnfName2)
308                 .oldAuthenticationState(oldAuthenticationState)
309                 .newAuthenticationState(newAuthenticationState)
310                 .stateInterface(stateInterface)
311                 .rgwMacAddress(rgwMacAddress2)
312                 .swVersion(swVersion)
313                 .build();
314
315         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
316         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
317         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
318                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
319         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
320                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
321
322         // Prepare Mocks
323         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
324         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
325         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
326                 hsiCfsServiceInstance1.getServiceInstanceId());
327         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
328                 hsiCfsServiceInstance2.getServiceInstanceId());
329
330         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
331         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
332                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
333
334         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
335         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
336
337         when(aaiClientTask
338                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
339                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
340         when(aaiClientTask
341                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
342                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
343
344         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
345         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
346
347         // Execute the pipeline
348         StepVerifier.create(pipeline.executePipeline())
349                 .expectSubscription()
350                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
351                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
352                 .verifyComplete();
353
354         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
355     }
356
357     @Test
358     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
359
360         String pnfName = "olt1";
361         final String oldAuthenticationState = "outOfService";
362         final String newAuthenticationState = "inService";
363         final String stateInterface = "stateInterface";
364         final String rgwMacAddress = "00:0a:95:8d:78:16";
365         final String swVersion = "1.2";
366
367         // Prepare stubbed replies
368         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
369                 .correlationId(pnfName)
370                 .oldAuthenticationState(oldAuthenticationState)
371                 .newAuthenticationState(newAuthenticationState)
372                 .stateInterface(stateInterface)
373                 .rgwMacAddress(rgwMacAddress)
374                 .swVersion(swVersion)
375                 .build();
376
377         // Prepare Mocks
378         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
379         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
380         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
381                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
382
383         // Execute the pipeline
384         StepVerifier.create(pipeline.executePipeline())
385                 .expectSubscription()
386                 .verifyComplete();
387
388         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
389         verifyNoMoreInteractions(aaiClientTask);
390         verifyZeroInteractions(publisherTask);
391     }
392
393     @Test
394     void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
395
396         String pnfName1 = "olt1";
397         String pnfName2 = "olt2";
398         final String oldAuthenticationState = "outOfService";
399         final String newAuthenticationState = "inService";
400         final String stateInterface = "stateInterface";
401         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
402         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
403         final String swVersion = "1.2";
404         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
405         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
406
407         // Prepare stubbed replies
408         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
409                 .correlationId(pnfName1)
410                 .oldAuthenticationState(oldAuthenticationState)
411                 .newAuthenticationState(newAuthenticationState)
412                 .stateInterface(stateInterface)
413                 .rgwMacAddress(rgwMacAddress1)
414                 .swVersion(swVersion)
415                 .build();
416         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
417                 .correlationId(pnfName2)
418                 .oldAuthenticationState(oldAuthenticationState)
419                 .newAuthenticationState(newAuthenticationState)
420                 .stateInterface(stateInterface)
421                 .rgwMacAddress(rgwMacAddress2)
422                 .swVersion(swVersion)
423                 .build();
424
425         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
426         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
427         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
428                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
429         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
430                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
431                         "Having unmatched RGW MAC address");
432
433         // Prepare Mocks
434         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
435         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
436         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
437                 hsiCfsServiceInstance1.getServiceInstanceId());
438         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
439                 hsiCfsServiceInstance2.getServiceInstanceId());
440
441         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
442         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
443                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
444
445         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
446         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
447
448         when(aaiClientTask
449                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
450                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
451         when(aaiClientTask
452                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
453                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
454
455         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
456         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
457
458         // Execute the pipeline
459         StepVerifier.create(pipeline.executePipeline())
460                 .expectSubscription()
461                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
462                 .verifyComplete();
463
464         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
465     }
466
467     @Test
468     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
469
470         String pnfName1 = "olt1";
471         String pnfName2 = "olt2";
472         final String oldAuthenticationState = "outOfService";
473         final String newAuthenticationState = "inService";
474         final String stateInterface = "stateInterface";
475         final String rgwMacAddress = "00:0a:95:8d:78:16";
476         final String swVersion = "1.2";
477         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
478
479         // Prepare stubbed replies
480         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
481                 .correlationId(pnfName1)
482                 .oldAuthenticationState(oldAuthenticationState)
483                 .newAuthenticationState(newAuthenticationState)
484                 .stateInterface(stateInterface)
485                 .rgwMacAddress(rgwMacAddress)
486                 .swVersion(swVersion)
487                 .build();
488         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
489                 .correlationId(pnfName2)
490                 .oldAuthenticationState(oldAuthenticationState)
491                 .newAuthenticationState(newAuthenticationState)
492                 .stateInterface(stateInterface)
493                 .rgwMacAddress(rgwMacAddress)
494                 .swVersion(swVersion)
495                 .build();
496
497         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
498         ServiceInstanceAaiObject hsiCfsServiceInstance =
499                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
500
501         // Prepare Mocks
502         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
503                 hsiCfsServiceInstance.getServiceInstanceId());
504
505         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
506         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
507                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
508         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
509                 .thenReturn(Mono.just(pnfAaiObject))
510                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
511         when(aaiClientTask
512                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
513                 .thenReturn(Mono.just(hsiCfsServiceInstance));
514
515         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
516         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
517
518         // Execute the pipeline
519         StepVerifier.create(pipeline.executePipeline())
520                 .expectSubscription()
521                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
522                 .verifyComplete();
523
524         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
525         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
526         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
527     }
528
529     @Test
530     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
531
532         String pnfName1 = "olt1";
533         String pnfName2 = "olt2";
534         final String oldAuthenticationState = "outOfService";
535         final String newAuthenticationState = "inService";
536         final String stateInterface = "stateInterface";
537         final String rgwMacAddress = "00:0a:95:8d:78:16";
538         final String swVersion = "1.2";
539         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
540
541         // Prepare stubbed replies
542         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
543                 .correlationId(pnfName1)
544                 .oldAuthenticationState(oldAuthenticationState)
545                 .newAuthenticationState(newAuthenticationState)
546                 .stateInterface(stateInterface)
547                 .rgwMacAddress(rgwMacAddress)
548                 .swVersion(swVersion)
549                 .build();
550         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
551                 .correlationId(pnfName2)
552                 .oldAuthenticationState(oldAuthenticationState)
553                 .newAuthenticationState(newAuthenticationState)
554                 .stateInterface(stateInterface)
555                 .rgwMacAddress(rgwMacAddress)
556                 .swVersion(swVersion)
557                 .build();
558
559         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
560         ServiceInstanceAaiObject hsiCfsServiceInstance =
561                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
562
563         // Prepare Mocks
564         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
565                 hsiCfsServiceInstance.getServiceInstanceId());
566
567         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
568         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
569                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
570         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
571                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
572                 .thenReturn(Mono.just(pnfAaiObject));
573         when(aaiClientTask
574                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
575                 .thenReturn(Mono.just(hsiCfsServiceInstance));
576
577         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
578         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
579
580         // Execute the pipeline
581         StepVerifier.create(pipeline.executePipeline())
582                 .expectSubscription()
583                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
584                 .verifyComplete();
585
586         verify(aaiClientTask, times(2))
587                 .executePnfRetrieval(anyString(), anyString());
588         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
589         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
590     }
591
592     private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
593
594         // Build Relationship Data
595         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
596                 ImmutableRelationshipEntryAaiObject.builder()
597                         .relatedTo("service-instance")
598                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
599                                 + "/service-subscription/BBS-CFS/service-instances"
600                                 + "/service-instance/" + hsiCfsServiceInstanceId)
601                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
602                         .relationshipData(Arrays.asList(
603                                 ImmutableRelationshipDataEntryAaiObject.builder()
604                                         .relationshipKey("customer.global-customer-id")
605                                         .relationshipValue("Demonstration").build(),
606                                 ImmutableRelationshipDataEntryAaiObject.builder()
607                                         .relationshipKey("service-subscription.service-type")
608                                         .relationshipValue("BBS-CFS").build(),
609                                 ImmutableRelationshipDataEntryAaiObject.builder()
610                                         .relationshipKey("service-instance.service-instance-id")
611                                         .relationshipValue(hsiCfsServiceInstanceId).build())
612                         )
613                         .relatedToProperties(Collections.singletonList(
614                                 ImmutablePropertyAaiObject.builder()
615                                         .propertyKey("service-instance.service-instance-name")
616                                         .propertyValue("bbs-instance").build())
617                         )
618                         .build();
619
620         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
621                 .relationshipEntries(Collections.singletonList(relationshipEntry))
622                 .build();
623
624         // Finally construct PNF object data
625         return ImmutablePnfAaiObject.builder()
626                 .pnfName(pnfName)
627                 .isInMaintenance(true)
628                 .relationshipListAaiObject(relationshipListAaiObject)
629                 .build();
630     }
631
632     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
633                                                                           String pnfName,
634                                                                           String rgwMacAddress) {
635         String orchestrationStatus = "active";
636
637         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
638                 ImmutableRelationshipEntryAaiObject.builder()
639                         .relatedTo("pnf")
640                         .relatedLink("/pnfs/pnf/" + pnfName)
641                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
642                                 .relationshipKey("pnf.pnf-name")
643                                 .relationshipValue(pnfName).build()))
644                         .build();
645
646         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
647                 .relationshipEntries(Collections.singletonList(relationshipEntry))
648                 .build();
649
650         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
651                 ImmutableMetadataEntryAaiObject.builder()
652                         .metaname("rgw-mac-address")
653                         .metavalue(rgwMacAddress)
654                         .build();
655
656         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
657                 .metadataEntries(Collections.singletonList(metadataEntry))
658                 .build();
659
660         // Finally construct Service Instance object data
661         return ImmutableServiceInstanceAaiObject.builder()
662                 .serviceInstanceId(hsiCfsServiceInstanceId)
663                 .orchestrationStatus(orchestrationStatus)
664                 .relationshipListAaiObject(relationshipListAaiObject)
665                 .metadataListAaiObject(metadataListAaiObject)
666                 .build();
667     }
668 }