a1b6b14894bd963cc60bc1139fda9e537afe3a7c
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.pipelines;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
39
40 import javax.net.ssl.SSLException;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
51 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
52 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
55 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
59 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
60 import org.onap.bbs.event.processor.model.PnfAaiObject;
61 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
66 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
67 import org.springframework.http.HttpStatus;
68 import org.springframework.http.ResponseEntity;
69
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
73
74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
75 @SuppressWarnings("unchecked")
76 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
77 class ReRegistrationPipelineTest {
78
79     private ReRegistrationPipeline pipeline;
80     private ApplicationConfiguration configuration;
81     private DmaapReRegistrationConsumerTask consumerTask;
82     private DmaapPublisherTask publisherTask;
83     private AaiClientTask aaiClientTask;
84
85     private ResponseEntity<String> responseEntity;
86
87     @BeforeEach
88     void setup() {
89
90         responseEntity = Mockito.mock(ResponseEntity.class);
91
92         configuration = Mockito.mock(ApplicationConfiguration.class);
93         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
95         aaiClientTask = Mockito.mock(AaiClientTask.class);
96
97         when(configuration.getReRegistrationCloseLoopControlName())
98                 .thenReturn("controlName");
99         when(configuration.getReRegistrationCloseLoopPolicyScope())
100                 .thenReturn("policyScope");
101         when(configuration.getPolicyVersion())
102                 .thenReturn("1.0.0");
103         when(configuration.getCloseLoopTargetType())
104                 .thenReturn("VM");
105         when(configuration.getCloseLoopEventStatus())
106                 .thenReturn("ONSET");
107         when(configuration.getCloseLoopVersion())
108                 .thenReturn("1.0.2");
109         when(configuration.getCloseLoopTarget())
110                 .thenReturn("CL-Target");
111         when(configuration.getCloseLoopOriginator())
112                 .thenReturn("DCAE-BBS-ep");
113
114         pipeline = new ReRegistrationPipeline(configuration, consumerTask,
115                 publisherTask, aaiClientTask, new HashMap<>());
116     }
117
118     @Test
119     void handleEmptyResponseFromDmaap() throws SSLException {
120
121         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
122         when(consumerTask.execute(anyString()))
123                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
124
125         StepVerifier.create(pipeline.executePipeline())
126                 .expectSubscription()
127                 .verifyComplete();
128
129         verifyZeroInteractions(aaiClientTask);
130         verifyZeroInteractions(publisherTask);
131     }
132
133     @Test
134     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
135
136         // Prepare mocks
137         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
138         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
139                 .thenReturn(Flux.never());
140
141         // Execute pipeline
142         StepVerifier.create(pipeline.executePipeline())
143                 .expectSubscription()
144                 .verifyComplete();
145
146         verifyZeroInteractions(aaiClientTask);
147         verifyZeroInteractions(publisherTask);
148     }
149
150     @Test
151     void noResponseFromAai_PipelineTimesOut() throws SSLException {
152
153         String pnfName = "olt1";
154         String attachmentPoint = "olt2-2-2";
155         String remoteId = "newRemoteId";
156         String cvlan = "1005";
157         String svlan = "100";
158
159         // Prepare stubbed replies
160         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
161                 .correlationId(pnfName)
162                 .attachmentPoint(attachmentPoint)
163                 .remoteId(remoteId)
164                 .cVlan(cvlan)
165                 .sVlan(svlan)
166                 .build();
167
168         // Prepare mocks
169         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
170         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
171         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
172
173         // Execute pipeline
174         StepVerifier.create(pipeline.executePipeline())
175                 .expectSubscription()
176                 .verifyComplete();
177
178         verifyZeroInteractions(publisherTask);
179     }
180
181     @Test
182     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
183
184         String pnfName = "olt1";
185         String attachmentPoint = "olt2-2-2";
186         String remoteId = "newRemoteId";
187         String cvlan = "1005";
188         String svlan = "100";
189         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
190
191         // Prepare stubbed replies
192         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
193                 .correlationId(pnfName)
194                 .attachmentPoint(attachmentPoint)
195                 .remoteId(remoteId)
196                 .cVlan(cvlan)
197                 .sVlan(svlan)
198                 .build();
199
200         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
201         ServiceInstanceAaiObject hsiCfsServiceInstance =
202                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
203
204         // Prepare Mocks
205         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
206                 hsiCfsServiceInstance.getServiceInstanceId());
207
208         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
209         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
210
211         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
212                 .thenReturn(Mono.just(pnfAaiObject));
213
214         when(aaiClientTask
215                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
216                 .thenReturn(Mono.just(hsiCfsServiceInstance));
217
218         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
219
220         // Execute the pipeline
221         StepVerifier.create(pipeline.executePipeline())
222                 .expectSubscription()
223                 .verifyComplete();
224
225         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
226     }
227
228     @Test
229     void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
230
231         String pnfName = "olt1";
232         String attachmentPoint = "olt2-2-2";
233         String remoteId = "newRemoteId";
234         String cvlan = "1005";
235         String svlan = "100";
236         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
237
238         // Prepare stubbed replies
239         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
240                 .correlationId(pnfName)
241                 .attachmentPoint(attachmentPoint)
242                 .remoteId(remoteId)
243                 .cVlan(cvlan)
244                 .sVlan(svlan)
245                 .build();
246
247         PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
248         ServiceInstanceAaiObject hsiCfsServiceInstance =
249                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
250
251         // Prepare Mocks
252         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
253                 hsiCfsServiceInstance.getServiceInstanceId());
254
255         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
256         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
257
258         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
259                 .thenReturn(Mono.just(pnfAaiObject));
260
261         when(aaiClientTask
262                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
263                 .thenReturn(Mono.just(hsiCfsServiceInstance));
264
265         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
266         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
267
268         // Execute the pipeline
269         StepVerifier.create(pipeline.executePipeline())
270                 .expectSubscription()
271                 .verifyComplete();
272
273         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
274         verifyNoMoreInteractions(aaiClientTask);
275         verifyZeroInteractions(publisherTask);
276     }
277
278     @Test
279     void singleCorrectEvent_handleSuccessfully() throws SSLException {
280
281         String pnfName = "olt1";
282         String attachmentPoint = "olt2-2-2";
283         String remoteId = "newRemoteId";
284         String cvlan = "1005";
285         String svlan = "100";
286         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
287
288         // Prepare stubbed replies
289         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
290                 .correlationId(pnfName)
291                 .attachmentPoint(attachmentPoint)
292                 .remoteId(remoteId)
293                 .cVlan(cvlan)
294                 .sVlan(svlan)
295                 .build();
296
297         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
298         ServiceInstanceAaiObject hsiCfsServiceInstance =
299                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
300
301         // Prepare Mocks
302         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
303                 hsiCfsServiceInstance.getServiceInstanceId());
304
305         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
306         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
307
308         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
309                 .thenReturn(Mono.just(pnfAaiObject));
310
311         when(aaiClientTask
312                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
313                 .thenReturn(Mono.just(hsiCfsServiceInstance));
314
315         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
316         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
317
318         // Execute the pipeline
319         StepVerifier.create(pipeline.executePipeline())
320                 .expectSubscription()
321                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
322                 .verifyComplete();
323
324         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
325     }
326
327     @Test
328     void twoCorrectEvents_handleSuccessfully() throws SSLException {
329
330         String pnfName1 = "olt1";
331         String pnfName2 = "olt2";
332         String attachmentPoint1 = "olt1-1-1";
333         String attachmentPoint2 = "olt2-2-2";
334         String remoteId1 = "newRemoteId1";
335         String remoteId2 = "newRemoteId2";
336         String cvlan1 = "1005";
337         String cvlan2 = "1006";
338         String svlan = "100";
339         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
340         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
341
342         // Prepare stubbed replies
343         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
344                 .correlationId(pnfName1)
345                 .attachmentPoint(attachmentPoint1)
346                 .remoteId(remoteId1)
347                 .cVlan(cvlan1)
348                 .sVlan(svlan)
349                 .build();
350         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
351                 .correlationId(pnfName2)
352                 .attachmentPoint(attachmentPoint2)
353                 .remoteId(remoteId2)
354                 .cVlan(cvlan2)
355                 .sVlan(svlan)
356                 .build();
357
358         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
359         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
360         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
361                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
362         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
363                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
364
365         // Prepare Mocks
366         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
367         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
368         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
369                 hsiCfsServiceInstance1.getServiceInstanceId());
370         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
371                 hsiCfsServiceInstance2.getServiceInstanceId());
372
373         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
374         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
375                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
376
377         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
378         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
379
380         when(aaiClientTask
381                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
382                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
383         when(aaiClientTask
384                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
385                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
386
387         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
388         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
389
390         // Execute the pipeline
391         StepVerifier.create(pipeline.executePipeline())
392                 .expectSubscription()
393                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
394                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
395                 .verifyComplete();
396
397         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
398     }
399
400     @Test
401     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
402
403         String pnfName = "olt1";
404         String attachmentPoint = "olt2-2-2";
405         String remoteId = "newRemoteId";
406         String cvlan = "1005";
407         String svlan = "100";
408
409         // Prepare stubbed replies
410         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
411                 .correlationId(pnfName)
412                 .attachmentPoint(attachmentPoint)
413                 .remoteId(remoteId)
414                 .cVlan(cvlan)
415                 .sVlan(svlan)
416                 .build();
417
418         // Prepare Mocks
419         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
420         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
421         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
422                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
423
424         // Execute the pipeline
425         StepVerifier.create(pipeline.executePipeline())
426                 .expectSubscription()
427                 .verifyComplete();
428
429         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
430         verifyNoMoreInteractions(aaiClientTask);
431         verifyZeroInteractions(publisherTask);
432     }
433
434     @Test
435     void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
436
437         String pnfName1 = "olt1";
438         String pnfName2 = "olt2";
439         String attachmentPoint1 = "olt1-1-1";
440         String attachmentPoint2 = "olt2-2-2";
441         String remoteId1 = "newRemoteId1";
442         String remoteId2 = "newRemoteId2";
443         String cvlan1 = "1005";
444         String cvlan2 = "1006";
445         String svlan = "100";
446         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
447         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
448
449         // Prepare stubbed replies
450         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
451                 .correlationId(pnfName1)
452                 .attachmentPoint(attachmentPoint1)
453                 .remoteId(remoteId1)
454                 .cVlan(cvlan1)
455                 .sVlan(svlan)
456                 .build();
457         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
458                 .correlationId(pnfName2)
459                 .attachmentPoint(attachmentPoint2)
460                 .remoteId(remoteId2)
461                 .cVlan(cvlan2)
462                 .sVlan(svlan)
463                 .build();
464
465         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
466         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
467         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
468                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
469         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
470                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
471
472         // Prepare Mocks
473         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
474         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
475         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
476                 hsiCfsServiceInstance1.getServiceInstanceId());
477         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
478                 hsiCfsServiceInstance2.getServiceInstanceId());
479
480         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
481         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
482                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
483
484         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
485         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
486
487         when(aaiClientTask
488                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
489                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
490         when(aaiClientTask
491                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
492                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
493
494         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
495         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
496
497         // Execute the pipeline
498         StepVerifier.create(pipeline.executePipeline())
499                 .expectSubscription()
500                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
501                 .verifyComplete();
502
503         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
504     }
505
506     @Test
507     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
508
509         String pnfName1 = "olt1";
510         String pnfName2 = "olt2";
511         String attachmentPoint1 = "olt1-1-1";
512         String attachmentPoint2 = "olt2-2-2";
513         String remoteId1 = "newRemoteId1";
514         String remoteId2 = "newRemoteId2";
515         String cvlan1 = "1005";
516         String cvlan2 = "1006";
517         String svlan = "100";
518         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
519
520         // Prepare stubbed replies
521         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
522                 .correlationId(pnfName1)
523                 .attachmentPoint(attachmentPoint1)
524                 .remoteId(remoteId1)
525                 .cVlan(cvlan1)
526                 .sVlan(svlan)
527                 .build();
528         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
529                 .correlationId(pnfName2)
530                 .attachmentPoint(attachmentPoint2)
531                 .remoteId(remoteId2)
532                 .cVlan(cvlan2)
533                 .sVlan(svlan)
534                 .build();
535
536         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
537         ServiceInstanceAaiObject hsiCfsServiceInstance =
538                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
539
540         // Prepare Mocks
541         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
542                 hsiCfsServiceInstance.getServiceInstanceId());
543
544         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
545         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
546                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
547         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
548                 .thenReturn(Mono.just(pnfAaiObject))
549                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
550         when(aaiClientTask
551                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
552                 .thenReturn(Mono.just(hsiCfsServiceInstance));
553
554         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
555         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
556
557         // Execute the pipeline
558         StepVerifier.create(pipeline.executePipeline())
559                 .expectSubscription()
560                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
561                 .verifyComplete();
562
563         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
564         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
565         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
566     }
567
568     @Test
569     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
570
571         String pnfName1 = "olt1";
572         String pnfName2 = "olt2";
573         String attachmentPoint1 = "olt1-1-1";
574         String attachmentPoint2 = "olt2-2-2";
575         String remoteId1 = "newRemoteId1";
576         String remoteId2 = "newRemoteId2";
577         String cvlan1 = "1005";
578         String cvlan2 = "1006";
579         String svlan = "100";
580         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
581
582         // Prepare stubbed replies
583         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
584                 .correlationId(pnfName1)
585                 .attachmentPoint(attachmentPoint1)
586                 .remoteId(remoteId1)
587                 .cVlan(cvlan1)
588                 .sVlan(svlan)
589                 .build();
590         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
591                 .correlationId(pnfName2)
592                 .attachmentPoint(attachmentPoint2)
593                 .remoteId(remoteId2)
594                 .cVlan(cvlan2)
595                 .sVlan(svlan)
596                 .build();
597
598         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
599         ServiceInstanceAaiObject hsiCfsServiceInstance =
600                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
601
602         // Prepare Mocks
603         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
604                 hsiCfsServiceInstance.getServiceInstanceId());
605
606         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
607         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
608                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
609         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
610                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
611                 .thenReturn(Mono.just(pnfAaiObject));
612         when(aaiClientTask
613                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
614                 .thenReturn(Mono.just(hsiCfsServiceInstance));
615
616         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
617         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
618
619         // Execute the pipeline
620         StepVerifier.create(pipeline.executePipeline())
621                 .expectSubscription()
622                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
623                 .verifyComplete();
624
625         verify(aaiClientTask, times(2))
626                 .executePnfRetrieval(anyString(), anyString());
627         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
628         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
629     }
630
631     private PnfAaiObject constructPnfObject(String pnfName, String attachmentPoint,
632                                             String hsiCfsServiceInstanceId) {
633
634         // Build Relationship Data
635         RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
636                 ImmutableRelationshipEntryAaiObject.builder()
637                         .relatedTo("service-instance")
638                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
639                                 + "/service-subscription/BBS-CFS/service-instances"
640                                 + "/service-instance/" + hsiCfsServiceInstanceId)
641                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
642                         .relationshipData(Arrays.asList(
643                                 ImmutableRelationshipDataEntryAaiObject.builder()
644                                         .relationshipKey("customer.global-customer-id")
645                                         .relationshipValue("Demonstration").build(),
646                                 ImmutableRelationshipDataEntryAaiObject.builder()
647                                         .relationshipKey("service-subscription.service-type")
648                                         .relationshipValue("BBS-CFS").build(),
649                                 ImmutableRelationshipDataEntryAaiObject.builder()
650                                         .relationshipKey("service-instance.service-instance-id")
651                                         .relationshipValue(hsiCfsServiceInstanceId).build())
652                         )
653                         .relatedToProperties(Collections.singletonList(
654                                 ImmutablePropertyAaiObject.builder()
655                                         .propertyKey("service-instance.service-instance-name")
656                                         .propertyValue("bbs-instance").build())
657                         )
658                         .build();
659
660         RelationshipListAaiObject.RelationshipEntryAaiObject secondRelationshipEntry =
661                 ImmutableRelationshipEntryAaiObject.builder()
662                         .relatedTo("logical-link")
663                         .relatedLink("/network/logical-links/logical-link/" + attachmentPoint)
664                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
665                                 .relationshipKey("logical-link.link-name")
666                                 .relationshipValue(attachmentPoint).build()))
667                         .build();
668
669         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
670                 .relationshipEntries(Arrays.asList(firstRelationshipEntry, secondRelationshipEntry))
671                 .build();
672
673         // Finally construct PNF object data
674         return ImmutablePnfAaiObject.builder()
675                 .pnfName(pnfName)
676                 .isInMaintenance(true)
677                 .relationshipListAaiObject(relationshipListAaiObject)
678                 .build();
679     }
680
681     private PnfAaiObject constructPnfObjectWithoutLogicalLink(String pnfName, String hsiCfsServiceInstanceId) {
682
683         // Build Relationship Data
684         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
685                 ImmutableRelationshipEntryAaiObject.builder()
686                         .relatedTo("service-instance")
687                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
688                                 + "/service-subscription/BBS-CFS/service-instances"
689                                 + "/service-instance/" + hsiCfsServiceInstanceId)
690                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
691                         .relationshipData(Arrays.asList(
692                                 ImmutableRelationshipDataEntryAaiObject.builder()
693                                         .relationshipKey("customer.global-customer-id")
694                                         .relationshipValue("Demonstration").build(),
695                                 ImmutableRelationshipDataEntryAaiObject.builder()
696                                         .relationshipKey("service-subscription.service-type")
697                                         .relationshipValue("BBS-CFS").build(),
698                                 ImmutableRelationshipDataEntryAaiObject.builder()
699                                         .relationshipKey("service-instance.service-instance-id")
700                                         .relationshipValue(hsiCfsServiceInstanceId).build())
701                         )
702                         .relatedToProperties(Collections.singletonList(
703                                 ImmutablePropertyAaiObject.builder()
704                                         .propertyKey("service-instance.service-instance-name")
705                                         .propertyValue("bbs-instance").build())
706                         )
707                         .build();
708
709         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
710                 .relationshipEntries(Collections.singletonList(relationshipEntry))
711                 .build();
712
713         // Finally construct PNF object data
714         return ImmutablePnfAaiObject.builder()
715                 .pnfName(pnfName)
716                 .isInMaintenance(true)
717                 .relationshipListAaiObject(relationshipListAaiObject)
718                 .build();
719     }
720
721     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
722                                                                              String pnfName,
723                                                                              String cvlan) {
724         String orchestrationStatus = "active";
725
726         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
727                 ImmutableRelationshipEntryAaiObject.builder()
728                         .relatedTo("pnf")
729                         .relatedLink("/pnfs/pnf/" + pnfName)
730                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
731                                 .relationshipKey("pnf.pnf-name")
732                                 .relationshipValue(pnfName).build()))
733                         .build();
734
735         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
736                 .relationshipEntries(Collections.singletonList(relationshipEntry))
737                 .build();
738
739         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
740                 ImmutableMetadataEntryAaiObject.builder()
741                         .metaname("cvlan")
742                         .metavalue(cvlan)
743                         .build();
744
745         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
746                 .metadataEntries(Collections.singletonList(metadataEntry))
747                 .build();
748
749         // Finally construct Service Instance object data
750         return ImmutableServiceInstanceAaiObject.builder()
751                 .serviceInstanceId(hsiCfsServiceInstanceId)
752                 .orchestrationStatus(orchestrationStatus)
753                 .relationshipListAaiObject(relationshipListAaiObject)
754                 .metadataListAaiObject(metadataListAaiObject)
755                 .build();
756     }
757 }