2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
 
  51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
 
  52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  61 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  67 import org.springframework.http.HttpStatus;
 
  68 import org.springframework.http.ResponseEntity;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
 
  75 @SuppressWarnings("unchecked")
 
  76 @DisplayName("CPE Authentication Pipeline Unit-Tests")
 
  77 class CpeAuthenticationPipelineTest {
 
  79     private CpeAuthenticationPipeline pipeline;
 
  80     private ApplicationConfiguration configuration;
 
  81     private DmaapCpeAuthenticationConsumerTask consumerTask;
 
  82     private DmaapPublisherTask publisherTask;
 
  83     private AaiClientTask aaiClientTask;
 
  85     private ResponseEntity<String> responseEntity;
 
  90         responseEntity = Mockito.mock(ResponseEntity.class);
 
  92         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  93         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
 
  94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  95         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  97         when(configuration.getCpeAuthenticationCloseLoopControlName())
 
  98                 .thenReturn("controlName");
 
  99         when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
 
 100                 .thenReturn("policyScope");
 
 102         pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
 
 103                 publisherTask, aaiClientTask, new HashMap<>());
 
 107     void handleEmptyResponseFromDmaap() throws SSLException {
 
 109         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 110         when(consumerTask.execute(anyString()))
 
 111                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 113         StepVerifier.create(pipeline.executePipeline())
 
 114                 .expectSubscription()
 
 117         verifyZeroInteractions(aaiClientTask);
 
 118         verifyZeroInteractions(publisherTask);
 
 122     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 125         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 126         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 127                 .thenReturn(Flux.never());
 
 130         StepVerifier.create(pipeline.executePipeline())
 
 131                 .expectSubscription()
 
 134         verifyZeroInteractions(aaiClientTask);
 
 135         verifyZeroInteractions(publisherTask);
 
 139     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 141         String pnfName = "olt1";
 
 142         final String oldAuthenticationState = "outOfService";
 
 143         final String newAuthenticationState = "inService";
 
 144         final String stateInterface = "stateInterface";
 
 145         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 146         final String swVersion = "1.2";
 
 148         // Prepare stubbed replies
 
 149         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 150                 .correlationId(pnfName)
 
 151                 .oldAuthenticationState(oldAuthenticationState)
 
 152                 .newAuthenticationState(newAuthenticationState)
 
 153                 .stateInterface(stateInterface)
 
 154                 .rgwMacAddress(rgwMacAddress)
 
 155                 .swVersion(swVersion)
 
 159         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 160         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 161         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 164         StepVerifier.create(pipeline.executePipeline())
 
 165                 .expectSubscription()
 
 168         verifyZeroInteractions(publisherTask);
 
 172     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 174         String pnfName = "olt1";
 
 175         final String oldAuthenticationState = "outOfService";
 
 176         final String newAuthenticationState = "inService";
 
 177         final String stateInterface = "stateInterface";
 
 178         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 179         final String swVersion = "1.2";
 
 180         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 182         // Prepare stubbed replies
 
 183         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 184                 .correlationId(pnfName)
 
 185                 .oldAuthenticationState(oldAuthenticationState)
 
 186                 .newAuthenticationState(newAuthenticationState)
 
 187                 .stateInterface(stateInterface)
 
 188                 .rgwMacAddress(rgwMacAddress)
 
 189                 .swVersion(swVersion)
 
 192         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 193         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 194                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 197         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 198                 hsiCfsServiceInstance.getServiceInstanceId());
 
 200         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 201         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 203         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 204                 .thenReturn(Mono.just(pnfAaiObject));
 
 207                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 208                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 210         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 212         // Execute the pipeline
 
 213         StepVerifier.create(pipeline.executePipeline())
 
 214                 .expectSubscription()
 
 217         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 221     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 223         String pnfName = "olt1";
 
 224         final String oldAuthenticationState = "outOfService";
 
 225         final String newAuthenticationState = "inService";
 
 226         final String stateInterface = "stateInterface";
 
 227         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 228         final String swVersion = "1.2";
 
 229         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 231         // Prepare stubbed replies
 
 232         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 233                 .correlationId(pnfName)
 
 234                 .oldAuthenticationState(oldAuthenticationState)
 
 235                 .newAuthenticationState(newAuthenticationState)
 
 236                 .stateInterface(stateInterface)
 
 237                 .rgwMacAddress(rgwMacAddress)
 
 238                 .swVersion(swVersion)
 
 241         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 242         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 243                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 246         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 247                 hsiCfsServiceInstance.getServiceInstanceId());
 
 249         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 250         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 252         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 253                 .thenReturn(Mono.just(pnfAaiObject));
 
 256                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 257                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 259         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 260         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 262         // Execute the pipeline
 
 263         StepVerifier.create(pipeline.executePipeline())
 
 264                 .expectSubscription()
 
 265                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 268         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 272     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 274         String pnfName1 = "olt1";
 
 275         String pnfName2 = "olt2";
 
 276         final String oldAuthenticationState = "outOfService";
 
 277         final String newAuthenticationState = "inService";
 
 278         final String stateInterface = "stateInterface";
 
 279         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 280         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 281         final String swVersion = "1.2";
 
 282         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 283         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 285         // Prepare stubbed replies
 
 286         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 287                 .correlationId(pnfName1)
 
 288                 .oldAuthenticationState(oldAuthenticationState)
 
 289                 .newAuthenticationState(newAuthenticationState)
 
 290                 .stateInterface(stateInterface)
 
 291                 .rgwMacAddress(rgwMacAddress1)
 
 292                 .swVersion(swVersion)
 
 294         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 295                 .correlationId(pnfName2)
 
 296                 .oldAuthenticationState(oldAuthenticationState)
 
 297                 .newAuthenticationState(newAuthenticationState)
 
 298                 .stateInterface(stateInterface)
 
 299                 .rgwMacAddress(rgwMacAddress2)
 
 300                 .swVersion(swVersion)
 
 303         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 304         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 305         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 306                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 307         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 308                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
 
 311         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 312         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 313         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 314                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 315         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 316                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 318         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 319         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 320                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 322         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 323         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 326                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 327                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 329                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 330                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 332         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 333         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 335         // Execute the pipeline
 
 336         StepVerifier.create(pipeline.executePipeline())
 
 337                 .expectSubscription()
 
 338                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 339                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 342         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 346     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 348         String pnfName = "olt1";
 
 349         final String oldAuthenticationState = "outOfService";
 
 350         final String newAuthenticationState = "inService";
 
 351         final String stateInterface = "stateInterface";
 
 352         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 353         final String swVersion = "1.2";
 
 355         // Prepare stubbed replies
 
 356         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 357                 .correlationId(pnfName)
 
 358                 .oldAuthenticationState(oldAuthenticationState)
 
 359                 .newAuthenticationState(newAuthenticationState)
 
 360                 .stateInterface(stateInterface)
 
 361                 .rgwMacAddress(rgwMacAddress)
 
 362                 .swVersion(swVersion)
 
 366         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 367         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 368         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 369                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 371         // Execute the pipeline
 
 372         StepVerifier.create(pipeline.executePipeline())
 
 373                 .expectSubscription()
 
 376         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 377         verifyNoMoreInteractions(aaiClientTask);
 
 378         verifyZeroInteractions(publisherTask);
 
 382     void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
 
 384         String pnfName1 = "olt1";
 
 385         String pnfName2 = "olt2";
 
 386         final String oldAuthenticationState = "outOfService";
 
 387         final String newAuthenticationState = "inService";
 
 388         final String stateInterface = "stateInterface";
 
 389         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 390         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 391         final String swVersion = "1.2";
 
 392         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 393         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 395         // Prepare stubbed replies
 
 396         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 397                 .correlationId(pnfName1)
 
 398                 .oldAuthenticationState(oldAuthenticationState)
 
 399                 .newAuthenticationState(newAuthenticationState)
 
 400                 .stateInterface(stateInterface)
 
 401                 .rgwMacAddress(rgwMacAddress1)
 
 402                 .swVersion(swVersion)
 
 404         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 405                 .correlationId(pnfName2)
 
 406                 .oldAuthenticationState(oldAuthenticationState)
 
 407                 .newAuthenticationState(newAuthenticationState)
 
 408                 .stateInterface(stateInterface)
 
 409                 .rgwMacAddress(rgwMacAddress2)
 
 410                 .swVersion(swVersion)
 
 413         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 414         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 415         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 416                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 417         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 418                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
 
 419                         "Having unmatched RGW MAC address");
 
 422         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 423         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 424         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 425                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 426         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 427                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 429         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 430         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 431                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 433         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 434         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 437                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 438                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 440                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 441                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 443         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 444         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 446         // Execute the pipeline
 
 447         StepVerifier.create(pipeline.executePipeline())
 
 448                 .expectSubscription()
 
 449                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 452         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 456     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 458         String pnfName1 = "olt1";
 
 459         String pnfName2 = "olt2";
 
 460         final String oldAuthenticationState = "outOfService";
 
 461         final String newAuthenticationState = "inService";
 
 462         final String stateInterface = "stateInterface";
 
 463         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 464         final String swVersion = "1.2";
 
 465         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 467         // Prepare stubbed replies
 
 468         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 469                 .correlationId(pnfName1)
 
 470                 .oldAuthenticationState(oldAuthenticationState)
 
 471                 .newAuthenticationState(newAuthenticationState)
 
 472                 .stateInterface(stateInterface)
 
 473                 .rgwMacAddress(rgwMacAddress)
 
 474                 .swVersion(swVersion)
 
 476         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 477                 .correlationId(pnfName2)
 
 478                 .oldAuthenticationState(oldAuthenticationState)
 
 479                 .newAuthenticationState(newAuthenticationState)
 
 480                 .stateInterface(stateInterface)
 
 481                 .rgwMacAddress(rgwMacAddress)
 
 482                 .swVersion(swVersion)
 
 485         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
 
 486         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 487                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
 
 490         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 491                 hsiCfsServiceInstance.getServiceInstanceId());
 
 493         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 494         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 495                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 496         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 497                 .thenReturn(Mono.just(pnfAaiObject))
 
 498                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 500                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 501                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 503         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 504         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 506         // Execute the pipeline
 
 507         StepVerifier.create(pipeline.executePipeline())
 
 508                 .expectSubscription()
 
 509                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 512         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 513         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 514         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 518     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 520         String pnfName1 = "olt1";
 
 521         String pnfName2 = "olt2";
 
 522         final String oldAuthenticationState = "outOfService";
 
 523         final String newAuthenticationState = "inService";
 
 524         final String stateInterface = "stateInterface";
 
 525         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 526         final String swVersion = "1.2";
 
 527         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 529         // Prepare stubbed replies
 
 530         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 531                 .correlationId(pnfName1)
 
 532                 .oldAuthenticationState(oldAuthenticationState)
 
 533                 .newAuthenticationState(newAuthenticationState)
 
 534                 .stateInterface(stateInterface)
 
 535                 .rgwMacAddress(rgwMacAddress)
 
 536                 .swVersion(swVersion)
 
 538         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 539                 .correlationId(pnfName2)
 
 540                 .oldAuthenticationState(oldAuthenticationState)
 
 541                 .newAuthenticationState(newAuthenticationState)
 
 542                 .stateInterface(stateInterface)
 
 543                 .rgwMacAddress(rgwMacAddress)
 
 544                 .swVersion(swVersion)
 
 547         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
 
 548         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 549                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
 
 552         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 553                 hsiCfsServiceInstance.getServiceInstanceId());
 
 555         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 556         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 557                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 558         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 559                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 560                 .thenReturn(Mono.just(pnfAaiObject));
 
 562                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 563                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 565         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 566         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 568         // Execute the pipeline
 
 569         StepVerifier.create(pipeline.executePipeline())
 
 570                 .expectSubscription()
 
 571                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 574         verify(aaiClientTask, times(2))
 
 575                 .executePnfRetrieval(anyString(), anyString());
 
 576         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 577         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 580     private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
 
 582         // Build Relationship Data
 
 583         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 584                 ImmutableRelationshipEntryAaiObject.builder()
 
 585                         .relatedTo("service-instance")
 
 586                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 587                                 + "/service-subscription/BBS-CFS/service-instances"
 
 588                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 589                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 590                         .relationshipData(Arrays.asList(
 
 591                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 592                                         .relationshipKey("customer.global-customer-id")
 
 593                                         .relationshipValue("Demonstration").build(),
 
 594                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 595                                         .relationshipKey("service-subscription.service-type")
 
 596                                         .relationshipValue("BBS-CFS").build(),
 
 597                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 598                                         .relationshipKey("service-instance.service-instance-id")
 
 599                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 601                         .relatedToProperties(Collections.singletonList(
 
 602                                 ImmutablePropertyAaiObject.builder()
 
 603                                         .propertyKey("service-instance.service-instance-name")
 
 604                                         .propertyValue("bbs-instance").build())
 
 608         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 609                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 612         // Finally construct PNF object data
 
 613         return ImmutablePnfAaiObject.builder()
 
 615                 .isInMaintenance(true)
 
 616                 .relationshipListAaiObject(relationshipListAaiObject)
 
 620     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 622                                                                           String rgwMacAddress) {
 
 623         String orchestrationStatus = "active";
 
 625         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 626                 ImmutableRelationshipEntryAaiObject.builder()
 
 628                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 629                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 630                                 .relationshipKey("pnf.pnf-name")
 
 631                                 .relationshipValue(pnfName).build()))
 
 634         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 635                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 638         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 639                 ImmutableMetadataEntryAaiObject.builder()
 
 640                         .metaname("rgw-mac-address")
 
 641                         .metavalue(rgwMacAddress)
 
 644         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 645                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 648         // Finally construct Service Instance object data
 
 649         return ImmutableServiceInstanceAaiObject.builder()
 
 650                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 651                 .orchestrationStatus(orchestrationStatus)
 
 652                 .relationshipListAaiObject(relationshipListAaiObject)
 
 653                 .metadataListAaiObject(metadataListAaiObject)