dbac5bf213968abdb56d3a57a792d274d0639e15
[dcaegen2/services.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.bbs.event.processor.pipelines;
22
23 import static org.junit.jupiter.api.Assertions.assertEquals;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.ArgumentMatchers.anyString;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.verifyNoMoreInteractions;
29 import static org.mockito.Mockito.verifyZeroInteractions;
30 import static org.mockito.Mockito.when;
31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.UUID;
39
40 import javax.net.ssl.SSLException;
41
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.DisplayName;
44 import org.junit.jupiter.api.Test;
45 import org.mockito.Mockito;
46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
61 import org.onap.bbs.event.processor.model.PnfAaiObject;
62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
67 import org.springframework.http.HttpStatus;
68 import org.springframework.http.ResponseEntity;
69
70 import reactor.core.publisher.Flux;
71 import reactor.core.publisher.Mono;
72 import reactor.test.StepVerifier;
73
74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
75 @SuppressWarnings("unchecked")
76 @DisplayName("CPE Authentication Pipeline Unit-Tests")
77 class CpeAuthenticationPipelineTest {
78
79     private CpeAuthenticationPipeline pipeline;
80     private ApplicationConfiguration configuration;
81     private DmaapCpeAuthenticationConsumerTask consumerTask;
82     private DmaapPublisherTask publisherTask;
83     private AaiClientTask aaiClientTask;
84
85     private ResponseEntity<String> responseEntity;
86
87     @BeforeEach
88     void setup() {
89
90         responseEntity = Mockito.mock(ResponseEntity.class);
91
92         configuration = Mockito.mock(ApplicationConfiguration.class);
93         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
95         aaiClientTask = Mockito.mock(AaiClientTask.class);
96
97         when(configuration.getCpeAuthenticationCloseLoopControlName())
98                 .thenReturn("controlName");
99         when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
100                 .thenReturn("policyScope");
101
102         pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
103                 publisherTask, aaiClientTask, new HashMap<>());
104     }
105
106     @Test
107     void handleEmptyResponseFromDmaap() throws SSLException {
108
109         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
110         when(consumerTask.execute(anyString()))
111                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
112
113         StepVerifier.create(pipeline.executePipeline())
114                 .expectSubscription()
115                 .verifyComplete();
116
117         verifyZeroInteractions(aaiClientTask);
118         verifyZeroInteractions(publisherTask);
119     }
120
121     @Test
122     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
123
124         // Prepare mocks
125         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
126         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
127                 .thenReturn(Flux.never());
128
129         // Execute pipeline
130         StepVerifier.create(pipeline.executePipeline())
131                 .expectSubscription()
132                 .verifyComplete();
133
134         verifyZeroInteractions(aaiClientTask);
135         verifyZeroInteractions(publisherTask);
136     }
137
138     @Test
139     void noResponseFromAai_PipelineTimesOut() throws SSLException {
140
141         String pnfName = "olt1";
142         final String oldAuthenticationState = "outOfService";
143         final String newAuthenticationState = "inService";
144         final String stateInterface = "stateInterface";
145         final String rgwMacAddress = "00:0a:95:8d:78:16";
146         final String swVersion = "1.2";
147
148         // Prepare stubbed replies
149         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
150                 .correlationId(pnfName)
151                 .oldAuthenticationState(oldAuthenticationState)
152                 .newAuthenticationState(newAuthenticationState)
153                 .stateInterface(stateInterface)
154                 .rgwMacAddress(rgwMacAddress)
155                 .swVersion(swVersion)
156                 .build();
157
158         // Prepare mocks
159         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
160         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
161         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
162
163         // Execute pipeline
164         StepVerifier.create(pipeline.executePipeline())
165                 .expectSubscription()
166                 .verifyComplete();
167
168         verifyZeroInteractions(publisherTask);
169     }
170
171     @Test
172     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
173
174         String pnfName = "olt1";
175         final String oldAuthenticationState = "outOfService";
176         final String newAuthenticationState = "inService";
177         final String stateInterface = "stateInterface";
178         final String rgwMacAddress = "00:0a:95:8d:78:16";
179         final String swVersion = "1.2";
180         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
181
182         // Prepare stubbed replies
183         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
184                 .correlationId(pnfName)
185                 .oldAuthenticationState(oldAuthenticationState)
186                 .newAuthenticationState(newAuthenticationState)
187                 .stateInterface(stateInterface)
188                 .rgwMacAddress(rgwMacAddress)
189                 .swVersion(swVersion)
190                 .build();
191
192         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
193         ServiceInstanceAaiObject hsiCfsServiceInstance =
194                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
195
196         // Prepare Mocks
197         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
198                 hsiCfsServiceInstance.getServiceInstanceId());
199
200         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
201         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
202
203         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
204                 .thenReturn(Mono.just(pnfAaiObject));
205
206         when(aaiClientTask
207                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
208                 .thenReturn(Mono.just(hsiCfsServiceInstance));
209
210         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
211
212         // Execute the pipeline
213         StepVerifier.create(pipeline.executePipeline())
214                 .expectSubscription()
215                 .verifyComplete();
216
217         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
218     }
219
220     @Test
221     void singleCorrectEvent_handleSuccessfully() throws SSLException {
222
223         String pnfName = "olt1";
224         final String oldAuthenticationState = "outOfService";
225         final String newAuthenticationState = "inService";
226         final String stateInterface = "stateInterface";
227         final String rgwMacAddress = "00:0a:95:8d:78:16";
228         final String swVersion = "1.2";
229         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
230
231         // Prepare stubbed replies
232         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
233                 .correlationId(pnfName)
234                 .oldAuthenticationState(oldAuthenticationState)
235                 .newAuthenticationState(newAuthenticationState)
236                 .stateInterface(stateInterface)
237                 .rgwMacAddress(rgwMacAddress)
238                 .swVersion(swVersion)
239                 .build();
240
241         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
242         ServiceInstanceAaiObject hsiCfsServiceInstance =
243                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
244
245         // Prepare Mocks
246         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
247                 hsiCfsServiceInstance.getServiceInstanceId());
248
249         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
250         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
251
252         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
253                 .thenReturn(Mono.just(pnfAaiObject));
254
255         when(aaiClientTask
256                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
257                 .thenReturn(Mono.just(hsiCfsServiceInstance));
258
259         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
260         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
261
262         // Execute the pipeline
263         StepVerifier.create(pipeline.executePipeline())
264                 .expectSubscription()
265                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
266                 .verifyComplete();
267
268         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
269     }
270
271     @Test
272     void twoCorrectEvents_handleSuccessfully() throws SSLException {
273
274         String pnfName1 = "olt1";
275         String pnfName2 = "olt2";
276         final String oldAuthenticationState = "outOfService";
277         final String newAuthenticationState = "inService";
278         final String stateInterface = "stateInterface";
279         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
280         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
281         final String swVersion = "1.2";
282         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
283         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
284
285         // Prepare stubbed replies
286         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
287                 .correlationId(pnfName1)
288                 .oldAuthenticationState(oldAuthenticationState)
289                 .newAuthenticationState(newAuthenticationState)
290                 .stateInterface(stateInterface)
291                 .rgwMacAddress(rgwMacAddress1)
292                 .swVersion(swVersion)
293                 .build();
294         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
295                 .correlationId(pnfName2)
296                 .oldAuthenticationState(oldAuthenticationState)
297                 .newAuthenticationState(newAuthenticationState)
298                 .stateInterface(stateInterface)
299                 .rgwMacAddress(rgwMacAddress2)
300                 .swVersion(swVersion)
301                 .build();
302
303         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
304         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
305         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
306                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
307         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
308                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
309
310         // Prepare Mocks
311         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
312         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
313         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
314                 hsiCfsServiceInstance1.getServiceInstanceId());
315         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
316                 hsiCfsServiceInstance2.getServiceInstanceId());
317
318         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
319         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
320                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
321
322         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
323         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
324
325         when(aaiClientTask
326                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
327                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
328         when(aaiClientTask
329                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
330                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
331
332         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
333         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
334
335         // Execute the pipeline
336         StepVerifier.create(pipeline.executePipeline())
337                 .expectSubscription()
338                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
339                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
340                 .verifyComplete();
341
342         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
343     }
344
345     @Test
346     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
347
348         String pnfName = "olt1";
349         final String oldAuthenticationState = "outOfService";
350         final String newAuthenticationState = "inService";
351         final String stateInterface = "stateInterface";
352         final String rgwMacAddress = "00:0a:95:8d:78:16";
353         final String swVersion = "1.2";
354
355         // Prepare stubbed replies
356         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
357                 .correlationId(pnfName)
358                 .oldAuthenticationState(oldAuthenticationState)
359                 .newAuthenticationState(newAuthenticationState)
360                 .stateInterface(stateInterface)
361                 .rgwMacAddress(rgwMacAddress)
362                 .swVersion(swVersion)
363                 .build();
364
365         // Prepare Mocks
366         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
367         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
368         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
369                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
370
371         // Execute the pipeline
372         StepVerifier.create(pipeline.executePipeline())
373                 .expectSubscription()
374                 .verifyComplete();
375
376         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
377         verifyNoMoreInteractions(aaiClientTask);
378         verifyZeroInteractions(publisherTask);
379     }
380
381     @Test
382     void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
383
384         String pnfName1 = "olt1";
385         String pnfName2 = "olt2";
386         final String oldAuthenticationState = "outOfService";
387         final String newAuthenticationState = "inService";
388         final String stateInterface = "stateInterface";
389         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
390         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
391         final String swVersion = "1.2";
392         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
393         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
394
395         // Prepare stubbed replies
396         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
397                 .correlationId(pnfName1)
398                 .oldAuthenticationState(oldAuthenticationState)
399                 .newAuthenticationState(newAuthenticationState)
400                 .stateInterface(stateInterface)
401                 .rgwMacAddress(rgwMacAddress1)
402                 .swVersion(swVersion)
403                 .build();
404         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
405                 .correlationId(pnfName2)
406                 .oldAuthenticationState(oldAuthenticationState)
407                 .newAuthenticationState(newAuthenticationState)
408                 .stateInterface(stateInterface)
409                 .rgwMacAddress(rgwMacAddress2)
410                 .swVersion(swVersion)
411                 .build();
412
413         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
414         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
415         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
416                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
417         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
418                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
419                         "Having unmatched RGW MAC address");
420
421         // Prepare Mocks
422         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
423         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
424         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
425                 hsiCfsServiceInstance1.getServiceInstanceId());
426         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
427                 hsiCfsServiceInstance2.getServiceInstanceId());
428
429         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
430         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
431                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
432
433         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
434         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
435
436         when(aaiClientTask
437                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
438                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
439         when(aaiClientTask
440                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
441                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
442
443         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
444         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
445
446         // Execute the pipeline
447         StepVerifier.create(pipeline.executePipeline())
448                 .expectSubscription()
449                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
450                 .verifyComplete();
451
452         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
453     }
454
455     @Test
456     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
457
458         String pnfName1 = "olt1";
459         String pnfName2 = "olt2";
460         final String oldAuthenticationState = "outOfService";
461         final String newAuthenticationState = "inService";
462         final String stateInterface = "stateInterface";
463         final String rgwMacAddress = "00:0a:95:8d:78:16";
464         final String swVersion = "1.2";
465         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
466
467         // Prepare stubbed replies
468         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
469                 .correlationId(pnfName1)
470                 .oldAuthenticationState(oldAuthenticationState)
471                 .newAuthenticationState(newAuthenticationState)
472                 .stateInterface(stateInterface)
473                 .rgwMacAddress(rgwMacAddress)
474                 .swVersion(swVersion)
475                 .build();
476         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
477                 .correlationId(pnfName2)
478                 .oldAuthenticationState(oldAuthenticationState)
479                 .newAuthenticationState(newAuthenticationState)
480                 .stateInterface(stateInterface)
481                 .rgwMacAddress(rgwMacAddress)
482                 .swVersion(swVersion)
483                 .build();
484
485         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
486         ServiceInstanceAaiObject hsiCfsServiceInstance =
487                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
488
489         // Prepare Mocks
490         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
491                 hsiCfsServiceInstance.getServiceInstanceId());
492
493         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
494         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
495                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
496         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
497                 .thenReturn(Mono.just(pnfAaiObject))
498                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
499         when(aaiClientTask
500                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
501                 .thenReturn(Mono.just(hsiCfsServiceInstance));
502
503         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
504         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
505
506         // Execute the pipeline
507         StepVerifier.create(pipeline.executePipeline())
508                 .expectSubscription()
509                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
510                 .verifyComplete();
511
512         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
513         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
514         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
515     }
516
517     @Test
518     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
519
520         String pnfName1 = "olt1";
521         String pnfName2 = "olt2";
522         final String oldAuthenticationState = "outOfService";
523         final String newAuthenticationState = "inService";
524         final String stateInterface = "stateInterface";
525         final String rgwMacAddress = "00:0a:95:8d:78:16";
526         final String swVersion = "1.2";
527         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
528
529         // Prepare stubbed replies
530         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
531                 .correlationId(pnfName1)
532                 .oldAuthenticationState(oldAuthenticationState)
533                 .newAuthenticationState(newAuthenticationState)
534                 .stateInterface(stateInterface)
535                 .rgwMacAddress(rgwMacAddress)
536                 .swVersion(swVersion)
537                 .build();
538         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
539                 .correlationId(pnfName2)
540                 .oldAuthenticationState(oldAuthenticationState)
541                 .newAuthenticationState(newAuthenticationState)
542                 .stateInterface(stateInterface)
543                 .rgwMacAddress(rgwMacAddress)
544                 .swVersion(swVersion)
545                 .build();
546
547         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
548         ServiceInstanceAaiObject hsiCfsServiceInstance =
549                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
550
551         // Prepare Mocks
552         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
553                 hsiCfsServiceInstance.getServiceInstanceId());
554
555         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
556         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
557                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
558         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
559                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
560                 .thenReturn(Mono.just(pnfAaiObject));
561         when(aaiClientTask
562                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
563                 .thenReturn(Mono.just(hsiCfsServiceInstance));
564
565         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
566         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
567
568         // Execute the pipeline
569         StepVerifier.create(pipeline.executePipeline())
570                 .expectSubscription()
571                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
572                 .verifyComplete();
573
574         verify(aaiClientTask, times(2))
575                 .executePnfRetrieval(anyString(), anyString());
576         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
577         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
578     }
579
580     private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
581
582         // Build Relationship Data
583         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
584                 ImmutableRelationshipEntryAaiObject.builder()
585                         .relatedTo("service-instance")
586                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
587                                 + "/service-subscription/BBS-CFS/service-instances"
588                                 + "/service-instance/" + hsiCfsServiceInstanceId)
589                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
590                         .relationshipData(Arrays.asList(
591                                 ImmutableRelationshipDataEntryAaiObject.builder()
592                                         .relationshipKey("customer.global-customer-id")
593                                         .relationshipValue("Demonstration").build(),
594                                 ImmutableRelationshipDataEntryAaiObject.builder()
595                                         .relationshipKey("service-subscription.service-type")
596                                         .relationshipValue("BBS-CFS").build(),
597                                 ImmutableRelationshipDataEntryAaiObject.builder()
598                                         .relationshipKey("service-instance.service-instance-id")
599                                         .relationshipValue(hsiCfsServiceInstanceId).build())
600                         )
601                         .relatedToProperties(Collections.singletonList(
602                                 ImmutablePropertyAaiObject.builder()
603                                         .propertyKey("service-instance.service-instance-name")
604                                         .propertyValue("bbs-instance").build())
605                         )
606                         .build();
607
608         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
609                 .relationshipEntries(Collections.singletonList(relationshipEntry))
610                 .build();
611
612         // Finally construct PNF object data
613         return ImmutablePnfAaiObject.builder()
614                 .pnfName(pnfName)
615                 .isInMaintenance(true)
616                 .relationshipListAaiObject(relationshipListAaiObject)
617                 .build();
618     }
619
620     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
621                                                                           String pnfName,
622                                                                           String rgwMacAddress) {
623         String orchestrationStatus = "active";
624
625         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
626                 ImmutableRelationshipEntryAaiObject.builder()
627                         .relatedTo("pnf")
628                         .relatedLink("/pnfs/pnf/" + pnfName)
629                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
630                                 .relationshipKey("pnf.pnf-name")
631                                 .relationshipValue(pnfName).build()))
632                         .build();
633
634         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
635                 .relationshipEntries(Collections.singletonList(relationshipEntry))
636                 .build();
637
638         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
639                 ImmutableMetadataEntryAaiObject.builder()
640                         .metaname("rgw-mac-address")
641                         .metavalue(rgwMacAddress)
642                         .build();
643
644         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
645                 .metadataEntries(Collections.singletonList(metadataEntry))
646                 .build();
647
648         // Finally construct Service Instance object data
649         return ImmutableServiceInstanceAaiObject.builder()
650                 .serviceInstanceId(hsiCfsServiceInstanceId)
651                 .orchestrationStatus(orchestrationStatus)
652                 .relationshipListAaiObject(relationshipListAaiObject)
653                 .metadataListAaiObject(metadataListAaiObject)
654                 .build();
655     }
656 }