2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
 
  51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
 
  52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  61 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  67 import org.springframework.http.HttpStatus;
 
  68 import org.springframework.http.ResponseEntity;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
 
  75 @SuppressWarnings("unchecked")
 
  76 @DisplayName("CPE Authentication Pipeline Unit-Tests")
 
  77 class CpeAuthenticationPipelineTest {
 
  79     private CpeAuthenticationPipeline pipeline;
 
  80     private ApplicationConfiguration configuration;
 
  81     private DmaapCpeAuthenticationConsumerTask consumerTask;
 
  82     private DmaapPublisherTask publisherTask;
 
  83     private AaiClientTask aaiClientTask;
 
  85     private ResponseEntity<String> responseEntity;
 
  90         responseEntity = Mockito.mock(ResponseEntity.class);
 
  92         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  93         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
 
  94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  95         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  97         when(configuration.getCpeAuthenticationCloseLoopControlName())
 
  98                 .thenReturn("controlName");
 
  99         when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
 
 100                 .thenReturn("policyScope");
 
 101         when(configuration.getPolicyVersion())
 
 102                 .thenReturn("1.0.0");
 
 103         when(configuration.getCloseLoopTargetType())
 
 105         when(configuration.getCloseLoopEventStatus())
 
 106                 .thenReturn("ONSET");
 
 107         when(configuration.getCloseLoopVersion())
 
 108                 .thenReturn("1.0.2");
 
 109         when(configuration.getCloseLoopTarget())
 
 110                 .thenReturn("CL-Target");
 
 111         when(configuration.getCloseLoopOriginator())
 
 112                 .thenReturn("DCAE-BBS-ep");
 
 114         pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
 
 115                 publisherTask, aaiClientTask, new HashMap<>());
 
 119     void handleEmptyResponseFromDmaap() throws SSLException {
 
 121         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 122         when(consumerTask.execute(anyString()))
 
 123                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 125         StepVerifier.create(pipeline.executePipeline())
 
 126                 .expectSubscription()
 
 129         verifyZeroInteractions(aaiClientTask);
 
 130         verifyZeroInteractions(publisherTask);
 
 134     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 137         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 138         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 139                 .thenReturn(Flux.never());
 
 142         StepVerifier.create(pipeline.executePipeline())
 
 143                 .expectSubscription()
 
 146         verifyZeroInteractions(aaiClientTask);
 
 147         verifyZeroInteractions(publisherTask);
 
 151     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 153         String pnfName = "olt1";
 
 154         final String oldAuthenticationState = "outOfService";
 
 155         final String newAuthenticationState = "inService";
 
 156         final String stateInterface = "stateInterface";
 
 157         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 158         final String swVersion = "1.2";
 
 160         // Prepare stubbed replies
 
 161         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 162                 .correlationId(pnfName)
 
 163                 .oldAuthenticationState(oldAuthenticationState)
 
 164                 .newAuthenticationState(newAuthenticationState)
 
 165                 .stateInterface(stateInterface)
 
 166                 .rgwMacAddress(rgwMacAddress)
 
 167                 .swVersion(swVersion)
 
 171         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 172         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 173         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 176         StepVerifier.create(pipeline.executePipeline())
 
 177                 .expectSubscription()
 
 180         verifyZeroInteractions(publisherTask);
 
 184     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 186         String pnfName = "olt1";
 
 187         final String oldAuthenticationState = "outOfService";
 
 188         final String newAuthenticationState = "inService";
 
 189         final String stateInterface = "stateInterface";
 
 190         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 191         final String swVersion = "1.2";
 
 192         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 194         // Prepare stubbed replies
 
 195         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 196                 .correlationId(pnfName)
 
 197                 .oldAuthenticationState(oldAuthenticationState)
 
 198                 .newAuthenticationState(newAuthenticationState)
 
 199                 .stateInterface(stateInterface)
 
 200                 .rgwMacAddress(rgwMacAddress)
 
 201                 .swVersion(swVersion)
 
 204         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 205         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 206                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 209         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 210                 hsiCfsServiceInstance.getServiceInstanceId());
 
 212         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 213         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 215         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 216                 .thenReturn(Mono.just(pnfAaiObject));
 
 219                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 220                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 222         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 224         // Execute the pipeline
 
 225         StepVerifier.create(pipeline.executePipeline())
 
 226                 .expectSubscription()
 
 229         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 233     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 235         String pnfName = "olt1";
 
 236         final String oldAuthenticationState = "outOfService";
 
 237         final String newAuthenticationState = "inService";
 
 238         final String stateInterface = "stateInterface";
 
 239         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 240         final String swVersion = "1.2";
 
 241         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 243         // Prepare stubbed replies
 
 244         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 245                 .correlationId(pnfName)
 
 246                 .oldAuthenticationState(oldAuthenticationState)
 
 247                 .newAuthenticationState(newAuthenticationState)
 
 248                 .stateInterface(stateInterface)
 
 249                 .rgwMacAddress(rgwMacAddress)
 
 250                 .swVersion(swVersion)
 
 253         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 254         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 255                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 258         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 259                 hsiCfsServiceInstance.getServiceInstanceId());
 
 261         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 262         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 264         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 265                 .thenReturn(Mono.just(pnfAaiObject));
 
 268                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 269                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 271         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 272         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 274         // Execute the pipeline
 
 275         StepVerifier.create(pipeline.executePipeline())
 
 276                 .expectSubscription()
 
 277                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 280         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 284     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 286         String pnfName1 = "olt1";
 
 287         String pnfName2 = "olt2";
 
 288         final String oldAuthenticationState = "outOfService";
 
 289         final String newAuthenticationState = "inService";
 
 290         final String stateInterface = "stateInterface";
 
 291         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 292         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 293         final String swVersion = "1.2";
 
 294         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 295         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 297         // Prepare stubbed replies
 
 298         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 299                 .correlationId(pnfName1)
 
 300                 .oldAuthenticationState(oldAuthenticationState)
 
 301                 .newAuthenticationState(newAuthenticationState)
 
 302                 .stateInterface(stateInterface)
 
 303                 .rgwMacAddress(rgwMacAddress1)
 
 304                 .swVersion(swVersion)
 
 306         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 307                 .correlationId(pnfName2)
 
 308                 .oldAuthenticationState(oldAuthenticationState)
 
 309                 .newAuthenticationState(newAuthenticationState)
 
 310                 .stateInterface(stateInterface)
 
 311                 .rgwMacAddress(rgwMacAddress2)
 
 312                 .swVersion(swVersion)
 
 315         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 316         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 317         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 318                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 319         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 320                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
 
 323         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 324         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 325         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 326                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 327         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 328                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 330         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 331         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 332                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 334         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 335         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 338                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 339                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 341                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 342                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 344         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 345         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 347         // Execute the pipeline
 
 348         StepVerifier.create(pipeline.executePipeline())
 
 349                 .expectSubscription()
 
 350                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 351                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 354         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 358     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 360         String pnfName = "olt1";
 
 361         final String oldAuthenticationState = "outOfService";
 
 362         final String newAuthenticationState = "inService";
 
 363         final String stateInterface = "stateInterface";
 
 364         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 365         final String swVersion = "1.2";
 
 367         // Prepare stubbed replies
 
 368         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 369                 .correlationId(pnfName)
 
 370                 .oldAuthenticationState(oldAuthenticationState)
 
 371                 .newAuthenticationState(newAuthenticationState)
 
 372                 .stateInterface(stateInterface)
 
 373                 .rgwMacAddress(rgwMacAddress)
 
 374                 .swVersion(swVersion)
 
 378         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 379         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 380         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 381                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 383         // Execute the pipeline
 
 384         StepVerifier.create(pipeline.executePipeline())
 
 385                 .expectSubscription()
 
 388         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 389         verifyNoMoreInteractions(aaiClientTask);
 
 390         verifyZeroInteractions(publisherTask);
 
 394     void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
 
 396         String pnfName1 = "olt1";
 
 397         String pnfName2 = "olt2";
 
 398         final String oldAuthenticationState = "outOfService";
 
 399         final String newAuthenticationState = "inService";
 
 400         final String stateInterface = "stateInterface";
 
 401         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 402         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 403         final String swVersion = "1.2";
 
 404         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 405         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 407         // Prepare stubbed replies
 
 408         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 409                 .correlationId(pnfName1)
 
 410                 .oldAuthenticationState(oldAuthenticationState)
 
 411                 .newAuthenticationState(newAuthenticationState)
 
 412                 .stateInterface(stateInterface)
 
 413                 .rgwMacAddress(rgwMacAddress1)
 
 414                 .swVersion(swVersion)
 
 416         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 417                 .correlationId(pnfName2)
 
 418                 .oldAuthenticationState(oldAuthenticationState)
 
 419                 .newAuthenticationState(newAuthenticationState)
 
 420                 .stateInterface(stateInterface)
 
 421                 .rgwMacAddress(rgwMacAddress2)
 
 422                 .swVersion(swVersion)
 
 425         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 426         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 427         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 428                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 429         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 430                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
 
 431                         "Having unmatched RGW MAC address");
 
 434         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 435         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 436         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 437                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 438         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 439                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 441         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 442         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 443                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 445         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 446         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 449                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 450                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 452                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 453                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 455         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 456         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 458         // Execute the pipeline
 
 459         StepVerifier.create(pipeline.executePipeline())
 
 460                 .expectSubscription()
 
 461                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 464         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 468     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 470         String pnfName1 = "olt1";
 
 471         String pnfName2 = "olt2";
 
 472         final String oldAuthenticationState = "outOfService";
 
 473         final String newAuthenticationState = "inService";
 
 474         final String stateInterface = "stateInterface";
 
 475         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 476         final String swVersion = "1.2";
 
 477         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 479         // Prepare stubbed replies
 
 480         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 481                 .correlationId(pnfName1)
 
 482                 .oldAuthenticationState(oldAuthenticationState)
 
 483                 .newAuthenticationState(newAuthenticationState)
 
 484                 .stateInterface(stateInterface)
 
 485                 .rgwMacAddress(rgwMacAddress)
 
 486                 .swVersion(swVersion)
 
 488         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 489                 .correlationId(pnfName2)
 
 490                 .oldAuthenticationState(oldAuthenticationState)
 
 491                 .newAuthenticationState(newAuthenticationState)
 
 492                 .stateInterface(stateInterface)
 
 493                 .rgwMacAddress(rgwMacAddress)
 
 494                 .swVersion(swVersion)
 
 497         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
 
 498         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 499                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
 
 502         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 503                 hsiCfsServiceInstance.getServiceInstanceId());
 
 505         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 506         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 507                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 508         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 509                 .thenReturn(Mono.just(pnfAaiObject))
 
 510                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 512                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 513                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 515         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 516         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 518         // Execute the pipeline
 
 519         StepVerifier.create(pipeline.executePipeline())
 
 520                 .expectSubscription()
 
 521                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 524         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 525         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 526         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 530     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 532         String pnfName1 = "olt1";
 
 533         String pnfName2 = "olt2";
 
 534         final String oldAuthenticationState = "outOfService";
 
 535         final String newAuthenticationState = "inService";
 
 536         final String stateInterface = "stateInterface";
 
 537         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 538         final String swVersion = "1.2";
 
 539         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 541         // Prepare stubbed replies
 
 542         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 543                 .correlationId(pnfName1)
 
 544                 .oldAuthenticationState(oldAuthenticationState)
 
 545                 .newAuthenticationState(newAuthenticationState)
 
 546                 .stateInterface(stateInterface)
 
 547                 .rgwMacAddress(rgwMacAddress)
 
 548                 .swVersion(swVersion)
 
 550         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 551                 .correlationId(pnfName2)
 
 552                 .oldAuthenticationState(oldAuthenticationState)
 
 553                 .newAuthenticationState(newAuthenticationState)
 
 554                 .stateInterface(stateInterface)
 
 555                 .rgwMacAddress(rgwMacAddress)
 
 556                 .swVersion(swVersion)
 
 559         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
 
 560         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 561                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
 
 564         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 565                 hsiCfsServiceInstance.getServiceInstanceId());
 
 567         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 568         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 569                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 570         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 571                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 572                 .thenReturn(Mono.just(pnfAaiObject));
 
 574                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 575                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 577         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 578         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 580         // Execute the pipeline
 
 581         StepVerifier.create(pipeline.executePipeline())
 
 582                 .expectSubscription()
 
 583                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 586         verify(aaiClientTask, times(2))
 
 587                 .executePnfRetrieval(anyString(), anyString());
 
 588         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 589         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 592     private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
 
 594         // Build Relationship Data
 
 595         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 596                 ImmutableRelationshipEntryAaiObject.builder()
 
 597                         .relatedTo("service-instance")
 
 598                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 599                                 + "/service-subscription/BBS-CFS/service-instances"
 
 600                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 601                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 602                         .relationshipData(Arrays.asList(
 
 603                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 604                                         .relationshipKey("customer.global-customer-id")
 
 605                                         .relationshipValue("Demonstration").build(),
 
 606                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 607                                         .relationshipKey("service-subscription.service-type")
 
 608                                         .relationshipValue("BBS-CFS").build(),
 
 609                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 610                                         .relationshipKey("service-instance.service-instance-id")
 
 611                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 613                         .relatedToProperties(Collections.singletonList(
 
 614                                 ImmutablePropertyAaiObject.builder()
 
 615                                         .propertyKey("service-instance.service-instance-name")
 
 616                                         .propertyValue("bbs-instance").build())
 
 620         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 621                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 624         // Finally construct PNF object data
 
 625         return ImmutablePnfAaiObject.builder()
 
 627                 .isInMaintenance(true)
 
 628                 .relationshipListAaiObject(relationshipListAaiObject)
 
 632     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 634                                                                           String rgwMacAddress) {
 
 635         String orchestrationStatus = "active";
 
 637         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 638                 ImmutableRelationshipEntryAaiObject.builder()
 
 640                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 641                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 642                                 .relationshipKey("pnf.pnf-name")
 
 643                                 .relationshipValue(pnfName).build()))
 
 646         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 647                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 650         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 651                 ImmutableMetadataEntryAaiObject.builder()
 
 652                         .metaname("rgw-mac-address")
 
 653                         .metavalue(rgwMacAddress)
 
 656         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 657                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 660         // Finally construct Service Instance object data
 
 661         return ImmutableServiceInstanceAaiObject.builder()
 
 662                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 663                 .orchestrationStatus(orchestrationStatus)
 
 664                 .relationshipListAaiObject(relationshipListAaiObject)
 
 665                 .metadataListAaiObject(metadataListAaiObject)