dbd1aab1c9aaac9c1bc45300546a3b61a0f0b478
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.pipelines;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
39
40 import javax.net.ssl.SSLException;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
51 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
52 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
55 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
59 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
60 import org.onap.bbs.event.processor.model.PnfAaiObject;
61 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
66 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
67 import org.springframework.http.HttpStatus;
68 import org.springframework.http.ResponseEntity;
69
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
73
74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
75 @SuppressWarnings("unchecked")
76 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
77 class ReRegistrationPipelineTest {
78
79     private ReRegistrationPipeline pipeline;
80     private ApplicationConfiguration configuration;
81     private DmaapReRegistrationConsumerTask consumerTask;
82     private DmaapPublisherTask publisherTask;
83     private AaiClientTask aaiClientTask;
84
85     private ResponseEntity<String> responseEntity;
86
87     @BeforeEach
88     void setup() {
89
90         responseEntity = Mockito.mock(ResponseEntity.class);
91
92         configuration = Mockito.mock(ApplicationConfiguration.class);
93         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
95         aaiClientTask = Mockito.mock(AaiClientTask.class);
96
97         when(configuration.getReRegistrationCloseLoopControlName())
98                 .thenReturn("controlName");
99         when(configuration.getReRegistrationCloseLoopPolicyScope())
100                 .thenReturn("policyScope");
101
102         pipeline = new ReRegistrationPipeline(configuration, consumerTask,
103                 publisherTask, aaiClientTask, new HashMap<>());
104     }
105
106     @Test
107     void handleEmptyResponseFromDmaap() throws SSLException {
108
109         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
110         when(consumerTask.execute(anyString()))
111                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
112
113         StepVerifier.create(pipeline.executePipeline())
114                 .expectSubscription()
115                 .verifyComplete();
116
117         verifyZeroInteractions(aaiClientTask);
118         verifyZeroInteractions(publisherTask);
119     }
120
121     @Test
122     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
123
124         // Prepare mocks
125         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
126         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
127                 .thenReturn(Flux.never());
128
129         // Execute pipeline
130         StepVerifier.create(pipeline.executePipeline())
131                 .expectSubscription()
132                 .verifyComplete();
133
134         verifyZeroInteractions(aaiClientTask);
135         verifyZeroInteractions(publisherTask);
136     }
137
138     @Test
139     void noResponseFromAai_PipelineTimesOut() throws SSLException {
140
141         String pnfName = "olt1";
142         String attachmentPoint = "olt2-2-2";
143         String remoteId = "newRemoteId";
144         String cvlan = "1005";
145         String svlan = "100";
146
147         // Prepare stubbed replies
148         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
149                 .correlationId(pnfName)
150                 .attachmentPoint(attachmentPoint)
151                 .remoteId(remoteId)
152                 .cVlan(cvlan)
153                 .sVlan(svlan)
154                 .build();
155
156         // Prepare mocks
157         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
158         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
159         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
160
161         // Execute pipeline
162         StepVerifier.create(pipeline.executePipeline())
163                 .expectSubscription()
164                 .verifyComplete();
165
166         verifyZeroInteractions(publisherTask);
167     }
168
169     @Test
170     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
171
172         String pnfName = "olt1";
173         String attachmentPoint = "olt2-2-2";
174         String remoteId = "newRemoteId";
175         String cvlan = "1005";
176         String svlan = "100";
177         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
178
179         // Prepare stubbed replies
180         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
181                 .correlationId(pnfName)
182                 .attachmentPoint(attachmentPoint)
183                 .remoteId(remoteId)
184                 .cVlan(cvlan)
185                 .sVlan(svlan)
186                 .build();
187
188         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
189         ServiceInstanceAaiObject hsiCfsServiceInstance =
190                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
191
192         // Prepare Mocks
193         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
194                 hsiCfsServiceInstance.getServiceInstanceId());
195
196         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
197         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
198
199         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
200                 .thenReturn(Mono.just(pnfAaiObject));
201
202         when(aaiClientTask
203                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
204                 .thenReturn(Mono.just(hsiCfsServiceInstance));
205
206         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
207
208         // Execute the pipeline
209         StepVerifier.create(pipeline.executePipeline())
210                 .expectSubscription()
211                 .verifyComplete();
212
213         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
214     }
215
216     @Test
217     void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
218
219         String pnfName = "olt1";
220         String attachmentPoint = "olt2-2-2";
221         String remoteId = "newRemoteId";
222         String cvlan = "1005";
223         String svlan = "100";
224         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
225
226         // Prepare stubbed replies
227         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
228                 .correlationId(pnfName)
229                 .attachmentPoint(attachmentPoint)
230                 .remoteId(remoteId)
231                 .cVlan(cvlan)
232                 .sVlan(svlan)
233                 .build();
234
235         PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
236         ServiceInstanceAaiObject hsiCfsServiceInstance =
237                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
238
239         // Prepare Mocks
240         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
241                 hsiCfsServiceInstance.getServiceInstanceId());
242
243         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
244         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
245
246         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
247                 .thenReturn(Mono.just(pnfAaiObject));
248
249         when(aaiClientTask
250                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
251                 .thenReturn(Mono.just(hsiCfsServiceInstance));
252
253         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
254         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
255
256         // Execute the pipeline
257         StepVerifier.create(pipeline.executePipeline())
258                 .expectSubscription()
259                 .verifyComplete();
260
261         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
262         verifyNoMoreInteractions(aaiClientTask);
263         verifyZeroInteractions(publisherTask);
264     }
265
266     @Test
267     void singleCorrectEvent_handleSuccessfully() throws SSLException {
268
269         String pnfName = "olt1";
270         String attachmentPoint = "olt2-2-2";
271         String remoteId = "newRemoteId";
272         String cvlan = "1005";
273         String svlan = "100";
274         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
275
276         // Prepare stubbed replies
277         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
278                 .correlationId(pnfName)
279                 .attachmentPoint(attachmentPoint)
280                 .remoteId(remoteId)
281                 .cVlan(cvlan)
282                 .sVlan(svlan)
283                 .build();
284
285         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
286         ServiceInstanceAaiObject hsiCfsServiceInstance =
287                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
288
289         // Prepare Mocks
290         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
291                 hsiCfsServiceInstance.getServiceInstanceId());
292
293         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
294         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
295
296         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
297                 .thenReturn(Mono.just(pnfAaiObject));
298
299         when(aaiClientTask
300                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
301                 .thenReturn(Mono.just(hsiCfsServiceInstance));
302
303         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
304         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
305
306         // Execute the pipeline
307         StepVerifier.create(pipeline.executePipeline())
308                 .expectSubscription()
309                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
310                 .verifyComplete();
311
312         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
313     }
314
315     @Test
316     void twoCorrectEvents_handleSuccessfully() throws SSLException {
317
318         String pnfName1 = "olt1";
319         String pnfName2 = "olt2";
320         String attachmentPoint1 = "olt1-1-1";
321         String attachmentPoint2 = "olt2-2-2";
322         String remoteId1 = "newRemoteId1";
323         String remoteId2 = "newRemoteId2";
324         String cvlan1 = "1005";
325         String cvlan2 = "1006";
326         String svlan = "100";
327         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
328         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
329
330         // Prepare stubbed replies
331         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
332                 .correlationId(pnfName1)
333                 .attachmentPoint(attachmentPoint1)
334                 .remoteId(remoteId1)
335                 .cVlan(cvlan1)
336                 .sVlan(svlan)
337                 .build();
338         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
339                 .correlationId(pnfName2)
340                 .attachmentPoint(attachmentPoint2)
341                 .remoteId(remoteId2)
342                 .cVlan(cvlan2)
343                 .sVlan(svlan)
344                 .build();
345
346         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
347         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
348         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
349                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
350         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
351                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
352
353         // Prepare Mocks
354         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
355         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
356         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
357                 hsiCfsServiceInstance1.getServiceInstanceId());
358         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
359                 hsiCfsServiceInstance2.getServiceInstanceId());
360
361         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
362         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
363                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
364
365         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
366         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
367
368         when(aaiClientTask
369                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
370                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
371         when(aaiClientTask
372                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
373                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
374
375         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
376         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
377
378         // Execute the pipeline
379         StepVerifier.create(pipeline.executePipeline())
380                 .expectSubscription()
381                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
382                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
383                 .verifyComplete();
384
385         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
386     }
387
388     @Test
389     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
390
391         String pnfName = "olt1";
392         String attachmentPoint = "olt2-2-2";
393         String remoteId = "newRemoteId";
394         String cvlan = "1005";
395         String svlan = "100";
396
397         // Prepare stubbed replies
398         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
399                 .correlationId(pnfName)
400                 .attachmentPoint(attachmentPoint)
401                 .remoteId(remoteId)
402                 .cVlan(cvlan)
403                 .sVlan(svlan)
404                 .build();
405
406         // Prepare Mocks
407         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
408         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
409         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
410                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
411
412         // Execute the pipeline
413         StepVerifier.create(pipeline.executePipeline())
414                 .expectSubscription()
415                 .verifyComplete();
416
417         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
418         verifyNoMoreInteractions(aaiClientTask);
419         verifyZeroInteractions(publisherTask);
420     }
421
422     @Test
423     void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
424
425         String pnfName1 = "olt1";
426         String pnfName2 = "olt2";
427         String attachmentPoint1 = "olt1-1-1";
428         String attachmentPoint2 = "olt2-2-2";
429         String remoteId1 = "newRemoteId1";
430         String remoteId2 = "newRemoteId2";
431         String cvlan1 = "1005";
432         String cvlan2 = "1006";
433         String svlan = "100";
434         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
435         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
436
437         // Prepare stubbed replies
438         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
439                 .correlationId(pnfName1)
440                 .attachmentPoint(attachmentPoint1)
441                 .remoteId(remoteId1)
442                 .cVlan(cvlan1)
443                 .sVlan(svlan)
444                 .build();
445         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
446                 .correlationId(pnfName2)
447                 .attachmentPoint(attachmentPoint2)
448                 .remoteId(remoteId2)
449                 .cVlan(cvlan2)
450                 .sVlan(svlan)
451                 .build();
452
453         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
454         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
455         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
456                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
457         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
458                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
459
460         // Prepare Mocks
461         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
462         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
463         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
464                 hsiCfsServiceInstance1.getServiceInstanceId());
465         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
466                 hsiCfsServiceInstance2.getServiceInstanceId());
467
468         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
469         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
470                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
471
472         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
473         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
474
475         when(aaiClientTask
476                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
477                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
478         when(aaiClientTask
479                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
480                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
481
482         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
483         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
484
485         // Execute the pipeline
486         StepVerifier.create(pipeline.executePipeline())
487                 .expectSubscription()
488                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
489                 .verifyComplete();
490
491         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
492     }
493
494     @Test
495     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
496
497         String pnfName1 = "olt1";
498         String pnfName2 = "olt2";
499         String attachmentPoint1 = "olt1-1-1";
500         String attachmentPoint2 = "olt2-2-2";
501         String remoteId1 = "newRemoteId1";
502         String remoteId2 = "newRemoteId2";
503         String cvlan1 = "1005";
504         String cvlan2 = "1006";
505         String svlan = "100";
506         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
507
508         // Prepare stubbed replies
509         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
510                 .correlationId(pnfName1)
511                 .attachmentPoint(attachmentPoint1)
512                 .remoteId(remoteId1)
513                 .cVlan(cvlan1)
514                 .sVlan(svlan)
515                 .build();
516         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
517                 .correlationId(pnfName2)
518                 .attachmentPoint(attachmentPoint2)
519                 .remoteId(remoteId2)
520                 .cVlan(cvlan2)
521                 .sVlan(svlan)
522                 .build();
523
524         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
525         ServiceInstanceAaiObject hsiCfsServiceInstance =
526                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
527
528         // Prepare Mocks
529         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
530                 hsiCfsServiceInstance.getServiceInstanceId());
531
532         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
533         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
534                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
535         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
536                 .thenReturn(Mono.just(pnfAaiObject))
537                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
538         when(aaiClientTask
539                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
540                 .thenReturn(Mono.just(hsiCfsServiceInstance));
541
542         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
543         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
544
545         // Execute the pipeline
546         StepVerifier.create(pipeline.executePipeline())
547                 .expectSubscription()
548                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
549                 .verifyComplete();
550
551         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
552         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
553         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
554     }
555
556     @Test
557     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
558
559         String pnfName1 = "olt1";
560         String pnfName2 = "olt2";
561         String attachmentPoint1 = "olt1-1-1";
562         String attachmentPoint2 = "olt2-2-2";
563         String remoteId1 = "newRemoteId1";
564         String remoteId2 = "newRemoteId2";
565         String cvlan1 = "1005";
566         String cvlan2 = "1006";
567         String svlan = "100";
568         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
569
570         // Prepare stubbed replies
571         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
572                 .correlationId(pnfName1)
573                 .attachmentPoint(attachmentPoint1)
574                 .remoteId(remoteId1)
575                 .cVlan(cvlan1)
576                 .sVlan(svlan)
577                 .build();
578         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
579                 .correlationId(pnfName2)
580                 .attachmentPoint(attachmentPoint2)
581                 .remoteId(remoteId2)
582                 .cVlan(cvlan2)
583                 .sVlan(svlan)
584                 .build();
585
586         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
587         ServiceInstanceAaiObject hsiCfsServiceInstance =
588                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
589
590         // Prepare Mocks
591         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
592                 hsiCfsServiceInstance.getServiceInstanceId());
593
594         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
595         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
596                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
597         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
598                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
599                 .thenReturn(Mono.just(pnfAaiObject));
600         when(aaiClientTask
601                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
602                 .thenReturn(Mono.just(hsiCfsServiceInstance));
603
604         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
605         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
606
607         // Execute the pipeline
608         StepVerifier.create(pipeline.executePipeline())
609                 .expectSubscription()
610                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
611                 .verifyComplete();
612
613         verify(aaiClientTask, times(2))
614                 .executePnfRetrieval(anyString(), anyString());
615         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
616         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
617     }
618
619     private PnfAaiObject constructPnfObject(String pnfName, String attachmentPoint,
620                                             String hsiCfsServiceInstanceId) {
621
622         // Build Relationship Data
623         RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
624                 ImmutableRelationshipEntryAaiObject.builder()
625                         .relatedTo("service-instance")
626                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
627                                 + "/service-subscription/BBS-CFS/service-instances"
628                                 + "/service-instance/" + hsiCfsServiceInstanceId)
629                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
630                         .relationshipData(Arrays.asList(
631                                 ImmutableRelationshipDataEntryAaiObject.builder()
632                                         .relationshipKey("customer.global-customer-id")
633                                         .relationshipValue("Demonstration").build(),
634                                 ImmutableRelationshipDataEntryAaiObject.builder()
635                                         .relationshipKey("service-subscription.service-type")
636                                         .relationshipValue("BBS-CFS").build(),
637                                 ImmutableRelationshipDataEntryAaiObject.builder()
638                                         .relationshipKey("service-instance.service-instance-id")
639                                         .relationshipValue(hsiCfsServiceInstanceId).build())
640                         )
641                         .relatedToProperties(Collections.singletonList(
642                                 ImmutablePropertyAaiObject.builder()
643                                         .propertyKey("service-instance.service-instance-name")
644                                         .propertyValue("bbs-instance").build())
645                         )
646                         .build();
647
648         RelationshipListAaiObject.RelationshipEntryAaiObject secondRelationshipEntry =
649                 ImmutableRelationshipEntryAaiObject.builder()
650                         .relatedTo("logical-link")
651                         .relatedLink("/network/logical-links/logical-link/" + attachmentPoint)
652                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
653                                 .relationshipKey("logical-link.link-name")
654                                 .relationshipValue(attachmentPoint).build()))
655                         .build();
656
657         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
658                 .relationshipEntries(Arrays.asList(firstRelationshipEntry, secondRelationshipEntry))
659                 .build();
660
661         // Finally construct PNF object data
662         return ImmutablePnfAaiObject.builder()
663                 .pnfName(pnfName)
664                 .isInMaintenance(true)
665                 .relationshipListAaiObject(relationshipListAaiObject)
666                 .build();
667     }
668
669     private PnfAaiObject constructPnfObjectWithoutLogicalLink(String pnfName, String hsiCfsServiceInstanceId) {
670
671         // Build Relationship Data
672         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
673                 ImmutableRelationshipEntryAaiObject.builder()
674                         .relatedTo("service-instance")
675                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
676                                 + "/service-subscription/BBS-CFS/service-instances"
677                                 + "/service-instance/" + hsiCfsServiceInstanceId)
678                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
679                         .relationshipData(Arrays.asList(
680                                 ImmutableRelationshipDataEntryAaiObject.builder()
681                                         .relationshipKey("customer.global-customer-id")
682                                         .relationshipValue("Demonstration").build(),
683                                 ImmutableRelationshipDataEntryAaiObject.builder()
684                                         .relationshipKey("service-subscription.service-type")
685                                         .relationshipValue("BBS-CFS").build(),
686                                 ImmutableRelationshipDataEntryAaiObject.builder()
687                                         .relationshipKey("service-instance.service-instance-id")
688                                         .relationshipValue(hsiCfsServiceInstanceId).build())
689                         )
690                         .relatedToProperties(Collections.singletonList(
691                                 ImmutablePropertyAaiObject.builder()
692                                         .propertyKey("service-instance.service-instance-name")
693                                         .propertyValue("bbs-instance").build())
694                         )
695                         .build();
696
697         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
698                 .relationshipEntries(Collections.singletonList(relationshipEntry))
699                 .build();
700
701         // Finally construct PNF object data
702         return ImmutablePnfAaiObject.builder()
703                 .pnfName(pnfName)
704                 .isInMaintenance(true)
705                 .relationshipListAaiObject(relationshipListAaiObject)
706                 .build();
707     }
708
709     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
710                                                                              String pnfName,
711                                                                              String cvlan) {
712         String orchestrationStatus = "active";
713
714         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
715                 ImmutableRelationshipEntryAaiObject.builder()
716                         .relatedTo("pnf")
717                         .relatedLink("/pnfs/pnf/" + pnfName)
718                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
719                                 .relationshipKey("pnf.pnf-name")
720                                 .relationshipValue(pnfName).build()))
721                         .build();
722
723         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
724                 .relationshipEntries(Collections.singletonList(relationshipEntry))
725                 .build();
726
727         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
728                 ImmutableMetadataEntryAaiObject.builder()
729                         .metaname("cvlan")
730                         .metavalue(cvlan)
731                         .build();
732
733         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
734                 .metadataEntries(Collections.singletonList(metadataEntry))
735                 .build();
736
737         // Finally construct Service Instance object data
738         return ImmutableServiceInstanceAaiObject.builder()
739                 .serviceInstanceId(hsiCfsServiceInstanceId)
740                 .orchestrationStatus(orchestrationStatus)
741                 .relationshipListAaiObject(relationshipListAaiObject)
742                 .metadataListAaiObject(metadataListAaiObject)
743                 .build();
744     }
745 }