2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.common.rules.test;
 
  23 import static org.junit.Assert.assertEquals;
 
  25 import java.util.Collections;
 
  26 import java.util.List;
 
  28 import java.util.UUID;
 
  29 import java.util.concurrent.TimeUnit;
 
  30 import java.util.function.Supplier;
 
  31 import java.util.stream.Collectors;
 
  32 import java.util.stream.Stream;
 
  33 import lombok.AccessLevel;
 
  36 import org.apache.commons.collections.MapUtils;
 
  37 import org.awaitility.Awaitility;
 
  38 import org.junit.Test;
 
  39 import org.onap.policy.appc.Request;
 
  40 import org.onap.policy.appclcm.AppcLcmDmaapWrapper;
 
  41 import org.onap.policy.common.utils.coder.Coder;
 
  42 import org.onap.policy.common.utils.coder.CoderException;
 
  43 import org.onap.policy.common.utils.coder.StandardCoder;
 
  44 import org.onap.policy.common.utils.coder.StandardCoderInstantAsMillis;
 
  45 import org.onap.policy.controlloop.ControlLoopNotificationType;
 
  46 import org.onap.policy.controlloop.VirtualControlLoopNotification;
 
  47 import org.onap.policy.controlloop.eventmanager.ControlLoopEventManager;
 
  48 import org.onap.policy.drools.system.PolicyEngineConstants;
 
  49 import org.onap.policy.drools.system.internal.SimpleLockManager;
 
  50 import org.onap.policy.drools.system.internal.SimpleLockManager.SimpleLock;
 
  51 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
 
  52 import org.onap.policy.sdnr.PciMessage;
 
  53 import org.powermock.reflect.Whitebox;
 
  56  * Superclass used for rule tests.
 
  58 public abstract class BaseTest {
 
  59     private static final String APPC_RESTART_OP = "restart";
 
  60     private static final String APPC_MODIFY_CONFIG_OP = "ModifyConfig";
 
  62     private static final String SDNR_MODIFY_CONFIG_OP = "ModifyConfig";
 
  63     private static final String SNDR_MODIFY_CONFIG_ANR_OP = "ModifyConfigANR";
 
  66      * Canonical Topic Names.
 
  68     protected static final String DCAE_TOPIC = "DCAE_TOPIC";
 
  69     protected static final String APPC_LCM_WRITE_TOPIC = "APPC-LCM-WRITE";
 
  70     protected static final String POLICY_CL_MGT_TOPIC = "POLICY-CL-MGT";
 
  71     protected static final String APPC_LCM_READ_TOPIC = "APPC-LCM-READ";
 
  72     protected static final String APPC_CL_TOPIC = "APPC-CL";
 
  73     protected static final String SDNR_CL_TOPIC = "SDNR-CL";
 
  74     protected static final String SDNR_CL_RSP_TOPIC = "SDNR-CL-RSP";
 
  75     protected static final String A1P_CL_TOPIC = "A1-P";
 
  76     protected static final String A1P_CL_RSP_TOPIC = "A1-P-RSP";
 
  79      * Constants for each test case.
 
  82     // service123 (i.e., multi-operation policy)
 
  83     private static final String SERVICE123_TOSCA_COMPLIANT_POLICY = "service123/tosca-compliant-service123.json";
 
  84     private static final String SERVICE123_ONSET = "service123/service123.onset.json";
 
  85     private static final String SERVICE123_APPC_RESTART_FAILURE = "service123/service123.appc.restart.failure.json";
 
  86     private static final String SERVICE123_APPC_REBUILD_FAILURE = "service123/service123.appc.rebuild.failure.json";
 
  87     private static final String SERVICE123_APPC_MIGRATE_SUCCESS = "service123/service123.appc.migrate.success.json";
 
  89     // duplicates (i.e., mutliple events in the engine at the same time)
 
  90     private static final String DUPLICATES_TOSCA_COMPLIANT_POLICY = "duplicates/tosca-compliant-duplicates.json";
 
  91     private static final String DUPLICATES_ONSET_1 = "duplicates/duplicates.onset.1.json";
 
  92     private static final String DUPLICATES_ONSET_2 = "duplicates/duplicates.onset.2.json";
 
  93     private static final String DUPLICATES_APPC_SUCCESS = "duplicates/duplicates.appc.success.json";
 
  96     private static final String VCPE_TOSCA_COMPLIANT_POLICY = "vcpe/tosca-compliant-vcpe.json";
 
  97     private static final String VCPE_ONSET_1 = "vcpe/vcpe.onset.1.json";
 
  98     private static final String VCPE_ONSET_2 = "vcpe/vcpe.onset.2.json";
 
  99     private static final String VCPE_ONSET_3 = "vcpe/vcpe.onset.3.json";
 
 100     private static final String VCPE_APPC_SUCCESS = "vcpe/vcpe.appc.success.json";
 
 103     private static final String VDNS_TOSCA_COMPLIANT_POLICY = "vdns/tosca-compliant-vdns.json";
 
 104     private static final String VDNS_TOSCA_COMPLIANT_RAINY_POLICY = "vdns/tosca-compliant-vdns-rainy.json";
 
 105     private static final String VDNS_ONSET = "vdns/vdns.onset.json";
 
 108     private static final String VFW_TOSCA_COMPLIANT_POLICY = "vfw/tosca-compliant-vfw.json";
 
 109     private static final String VFW_TOSCA_COMPLIANT_TIME_OUT_POLICY = "vfw/tosca-compliant-timeout-vfw.json";
 
 110     private static final String VFW_ONSET = "vfw/vfw.onset.json";
 
 111     private static final String VFW_APPC_SUCCESS = "vfw/vfw.appc.success.json";
 
 112     private static final String VFW_APPC_FAILURE = "vfw/vfw.appc.failure.json";
 
 114     // 5G SON Legacy - PCI
 
 115     private static final String VPCI_TOSCA_COMPLIANT_POLICY = "vpci/tosca-compliant-vpci.json";
 
 116     private static final String VPCI_ONSET = "vpci/vpci.onset.json";
 
 117     private static final String VPCI_SDNR_SUCCESS = "vpci/vpci.sdnr.success.json";
 
 119     // 5G SON Legacy - ANR
 
 120     private static final String VSONH_TOSCA_COMPLIANT_POLICY = "vsonh/tosca-compliant-vsonh.json";
 
 121     private static final String VSONH_ONSET = "vsonh/vsonh.onset.json";
 
 122     private static final String VSONH_SDNR_SUCCESS = "vsonh/vsonh.sdnr.success.json";
 
 124     // 5G SON Use case Policies (Kohn+)
 
 126     private static final String V5G_SON_O1_TOSCA_POLICY = "policies/v5gSonO1.policy.operational.input.tosca.json";
 
 127     private static final String V5G_SON_O1_ONSET = "vpci/v5G.son.O1.onset.json";
 
 128     private static final String V5G_SON_O1_SDNR_SUCCESS = "vpci/v5G.son.O1.sdnr.success.json";
 
 129     private static final String MODIFY_O1_CONFIG_OPERATION = "ModifyO1Config";
 
 131     private static final String V5G_SON_A1_TOSCA_POLICY = "policies/v5gSonA1.policy.operational.input.tosca.json";
 
 132     private static final String V5G_SON_A1_ONSET = "vsonh/v5G.son.A1.onset.json";
 
 133     private static final String V5G_SON_A1_SDNR_SUCCESS = "vsonh/v5G.son.A1.sdnr.success.json";
 
 134     private static final String PUT_A1_POLICY_OPERATION = "putA1Policy";
 
 136      * Coders used to decode requests and responses.
 
 138     protected static final Coder APPC_LEGACY_CODER = new StandardCoderInstantAsMillis();
 
 139     protected static final Coder APPC_LCM_CODER = new StandardCoder();
 
 140     protected static final Coder POLICY_CL_MGT_CODER = new PolicyClMgtCoder();
 
 143      * Coders used to decode requests and responses.
 
 145     private static final Coder SDNR_CODER = new StandardCoder();
 
 147     // these may be overridden by junit tests
 
 148     protected static Supplier<HttpClients> httpClientMaker = HttpClients::new;
 
 149     protected static Supplier<Simulators> simMaker = Simulators::new;
 
 150     protected static Supplier<Topics> topicMaker = Topics::new;
 
 152     protected static Rules rules;
 
 153     protected static HttpClients httpClients;
 
 154     protected static Simulators simulators;
 
 156     // used to inject and wait for messages
 
 157     @Getter(AccessLevel.PROTECTED)
 
 158     @Setter(AccessLevel.PROTECTED)
 
 159     protected static Topics topics;
 
 161     // used to wait for messages on SINK topics
 
 162     protected Listener<VirtualControlLoopNotification> policyClMgt;
 
 163     protected Listener<Request> appcClSink;
 
 164     protected Listener<AppcLcmDmaapWrapper> appcLcmRead;
 
 165     protected Listener<PciMessage> sdnrClSink;
 
 168      * Tosca Policy that was loaded.
 
 170     protected ToscaPolicy policy;
 
 173      * Initializes {@link #rules}, {@link #httpClients}, and {@link #simulators}.
 
 175     public static void initStatics() {
 
 176         httpClients = httpClientMaker.get();
 
 177         simulators = simMaker.get();
 
 181      * Destroys {@link #httpClients}, {@link #simulators}, and {@link #rules}.
 
 183     public static void finishStatics() {
 
 184         httpClients.destroy();
 
 185         simulators.destroy();
 
 189      * Initializes {@link #topics}.
 
 192         setTopics(topicMaker.get());
 
 194         Map<String, SimpleLock> locks = getLockMap();
 
 195         if (!MapUtils.isEmpty(locks)) {
 
 201      * Destroys {@link #topics} and resets the rule facts.
 
 203     public void finish() {
 
 207     // Service123 (i.e., Policy with multiple operations)
 
 210      * Service123 with Tosca Compliant Policy.
 
 213     public void testService123Compliant() {
 
 214         policyClMgt = createNoficationTopicListener();
 
 215         appcLcmRead = topics.createListener(APPC_LCM_READ_TOPIC, AppcLcmDmaapWrapper.class, APPC_LCM_CODER);
 
 216         policy = checkPolicy(SERVICE123_TOSCA_COMPLIANT_POLICY);
 
 218         // inject an ONSET event over the DCAE topic
 
 219         topics.inject(DCAE_TOPIC, SERVICE123_ONSET);
 
 220         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 222         waitForLockAndPermit(policy, policyClMgt);
 
 224         // restart request should be sent and fail four times (i.e., because retry=3)
 
 225         for (var count = 0; count < 4; ++count) {
 
 226             AppcLcmDmaapWrapper appcreq = appcLcmRead.await(req -> APPC_RESTART_OP.equals(req.getRpcName()));
 
 228             topics.inject(APPC_LCM_WRITE_TOPIC, SERVICE123_APPC_RESTART_FAILURE,
 
 229                             appcreq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 231         // rebuild request should be sent and fail once
 
 232         AppcLcmDmaapWrapper appcreq = appcLcmRead.await(req -> "rebuild".equals(req.getRpcName()));
 
 233         topics.inject(APPC_LCM_WRITE_TOPIC, SERVICE123_APPC_REBUILD_FAILURE,
 
 234                         appcreq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 235         // migrate request should be sent and succeed
 
 236         appcreq = appcLcmRead.await(req -> "migrate".equals(req.getRpcName()));
 
 237         topics.inject(APPC_LCM_WRITE_TOPIC, SERVICE123_APPC_MIGRATE_SUCCESS,
 
 238                         appcreq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 239         /* --- Operation Completed --- */
 
 240         waitForOperationSuccess();
 
 241         /* --- Transaction Completed --- */
 
 242         waitForFinalSuccess(policy, policyClMgt);
 
 250      * This test case tests the scenario where 3 events occur and 2 of the requests refer
 
 251      * to the same target entity while the 3rd is for another entity. The expected result
 
 252      * is that the event with the duplicate target entity will have a final success result
 
 253      * for one of the events, and a rejected message for the one that was unable to obtain
 
 254      * the lock. The event that is referring to a different target entity should be able
 
 255      * to obtain a lock since it is a different target. After processing of all events
 
 256      * there should only be the policy and params objects left in memory.
 
 259     public void testDuplicatesEvents() {
 
 260         policyClMgt = createNoficationTopicListener();
 
 261         appcLcmRead = topics.createListener(APPC_LCM_READ_TOPIC, AppcLcmDmaapWrapper.class, APPC_LCM_CODER);
 
 263         policy = checkPolicy(DUPLICATES_TOSCA_COMPLIANT_POLICY);
 
 265         final long initCount = getCreateCount();
 
 268          * Inject ONSET events over the DCAE topic. First and last have the same target
 
 269          * entity, but different request IDs - only one should succeed. The middle one is
 
 270          * for a different target entity, so it should succeed.
 
 272         topics.inject(DCAE_TOPIC, DUPLICATES_ONSET_1, UUID.randomUUID().toString());
 
 273         topics.inject(DCAE_TOPIC, DUPLICATES_ONSET_2);
 
 274         topics.inject(DCAE_TOPIC, DUPLICATES_ONSET_1, UUID.randomUUID().toString());
 
 276         // should see two restarts
 
 277         for (var count = 0; count < 2; ++count) {
 
 278             AppcLcmDmaapWrapper appcreq = appcLcmRead.await(req -> APPC_RESTART_OP.equals(req.getRpcName()));
 
 281             topics.inject(APPC_LCM_WRITE_TOPIC, DUPLICATES_APPC_SUCCESS,
 
 282                             appcreq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 285         // should see two FINAL successes
 
 286         VirtualControlLoopNotification notif1 = waitForFinalSuccess(policy, policyClMgt);
 
 287         VirtualControlLoopNotification notif2 = waitForFinalSuccess(policy, policyClMgt);
 
 289         // get the list of target names, so we can ensure there's one of each
 
 290         List<String> actual = Stream.of(notif1, notif2).map(notif -> notif.getAai().get("generic-vnf.vnf-id"))
 
 291                         .sorted().collect(Collectors.toList());
 
 293         assertEquals(List.of("duplicate-VNF", "vCPE_Infrastructure_vGMUX_demo_app").toString(), actual.toString());
 
 295         long added = getCreateCount() - initCount;
 
 296         assertEquals(2, added);
 
 304      * Sunny Day with Tosca Compliant Policy.
 
 307     public void testVcpeSunnyDayCompliant() {
 
 308         appcLcmSunnyDay(VCPE_TOSCA_COMPLIANT_POLICY, VCPE_ONSET_1, APPC_RESTART_OP);
 
 312      * An ONSET flood prevention test that injects a few ONSETs at once. It attempts to
 
 313      * simulate the flooding behavior of the DCAE TCA microservice. TCA could blast tens
 
 314      * or hundreds of ONSETs within sub-second intervals.
 
 317     public void testVcpeOnsetFloodPrevention() {
 
 318         appcLcmSunnyDay(VCPE_TOSCA_COMPLIANT_POLICY, List.of(VCPE_ONSET_1, VCPE_ONSET_2, VCPE_ONSET_3),
 
 325      * Sunny Day with Tosca Compliant Policy.
 
 328     public void testVdnsSunnyDayCompliant() {
 
 329         httpSunnyDay(VDNS_TOSCA_COMPLIANT_POLICY, VDNS_ONSET);
 
 333      * Vdns Rainy Day with Compliant Tosca Policy.
 
 336     public void testVdnsRainyDayCompliant() {
 
 337         httpRainyDay(VDNS_TOSCA_COMPLIANT_RAINY_POLICY, VDNS_ONSET);
 
 344      * VFW Sunny Day with Tosca Compliant Policy.
 
 347     public void testVfwSunnyDayCompliant() {
 
 348         appcLegacySunnyDay(VFW_TOSCA_COMPLIANT_POLICY, VFW_ONSET, APPC_MODIFY_CONFIG_OP);
 
 352      * VFW Rainy Day using compliant tosca policy (final failure).
 
 355     public void testVfwRainyDayOverallTimeout() {
 
 356         appcLegacyRainyDayNoResponse(VFW_TOSCA_COMPLIANT_TIME_OUT_POLICY, VFW_ONSET, APPC_MODIFY_CONFIG_OP);
 
 360      * VFW Rainy day using compliant tosca policy (final failure due to timeout).
 
 363     public void testVfwRainyDayCompliantTimeout() {
 
 364         appcLegacyRainyDayNoResponse(VFW_TOSCA_COMPLIANT_POLICY, VFW_ONSET, APPC_MODIFY_CONFIG_OP);
 
 368      * VPCI Sunny Day Tosca Policy.
 
 371     public void testVpciSunnyDayCompliant() {
 
 372         sdnrSunnyDay(VPCI_TOSCA_COMPLIANT_POLICY, VPCI_ONSET, VPCI_SDNR_SUCCESS,
 
 373             SDNR_MODIFY_CONFIG_OP, SDNR_CL_TOPIC, SDNR_CL_RSP_TOPIC);
 
 379      * VSONH Sunny Day with Tosca Policy.
 
 382     public void testVsonhSunnyDayCompliant() {
 
 383         sdnrSunnyDay(VSONH_TOSCA_COMPLIANT_POLICY, VSONH_ONSET, VSONH_SDNR_SUCCESS,
 
 384             SNDR_MODIFY_CONFIG_ANR_OP, SDNR_CL_TOPIC, SDNR_CL_RSP_TOPIC);
 
 388      * Sunny day 5G SON 01 Modify01Config Operational Policy.
 
 391     public void test5gSonO1SunnyDayCompliant() {
 
 392         sdnrSunnyDay(V5G_SON_O1_TOSCA_POLICY, V5G_SON_O1_ONSET, V5G_SON_O1_SDNR_SUCCESS,
 
 393             MODIFY_O1_CONFIG_OPERATION, SDNR_CL_TOPIC, SDNR_CL_RSP_TOPIC);
 
 397      * Sunny day 5G SON A1 ModifyA1Policy Operational Policy.
 
 400     public void test5gSonA1SunnyDayCompliant() {
 
 401         sdnrSunnyDay(V5G_SON_A1_TOSCA_POLICY, V5G_SON_A1_ONSET, V5G_SON_A1_SDNR_SUCCESS,
 
 402             PUT_A1_POLICY_OPERATION, A1P_CL_TOPIC, A1P_CL_RSP_TOPIC);
 
 406      * Sunny day scenario for use cases that use APPC-LCM.
 
 408      * @param policyFile file containing the ToscaPolicy to be loaded
 
 409      * @param onsetFile file containing the ONSET to be injected
 
 410      * @param operation expected APPC operation request
 
 412     protected void appcLcmSunnyDay(String policyFile, String onsetFile, String operation) {
 
 413         appcLcmSunnyDay(policyFile, List.of(onsetFile), operation);
 
 417      * Sunny day scenario for use cases that use APPC-LCM.
 
 419      * @param policyFile file containing the ToscaPolicy to be loaded
 
 420      * @param onsetFiles list of files containing the ONSET to be injected
 
 421      * @param operation expected APPC operation request
 
 423     protected void appcLcmSunnyDay(String policyFile, List<String> onsetFiles, String operation) {
 
 424         policyClMgt = createNoficationTopicListener();
 
 425         appcLcmRead = topics.createListener(APPC_LCM_READ_TOPIC, AppcLcmDmaapWrapper.class, APPC_LCM_CODER);
 
 427         policy = checkPolicy(policyFile);
 
 430         // inject several ONSET events over the DCAE topic
 
 431         for (String onsetFile : onsetFiles) {
 
 432             topics.inject(DCAE_TOPIC, onsetFile);
 
 435         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 436         waitForLockAndPermit(policy, policyClMgt);
 
 439          * Ensure that an APPC RESTART request was sent in response to the matching ONSET
 
 441         AppcLcmDmaapWrapper appcreq = appcLcmRead.await(req -> operation.equals(req.getRpcName()));
 
 444          * Inject a 400 APPC Response Return over the APPC topic, with appropriate
 
 447         topics.inject(APPC_LCM_WRITE_TOPIC, VCPE_APPC_SUCCESS,
 
 448                         appcreq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 450         /* --- Operation Completed --- */
 
 452         waitForOperationSuccess();
 
 454         /* --- Transaction Completed --- */
 
 455         waitForFinalSuccess(policy, policyClMgt);
 
 461      * Sunny day scenario for use cases that use Legacy APPC.
 
 463      * @param policyFile file containing the ToscaPolicy to be loaded
 
 464      * @param onsetFile file containing the ONSET to be injected
 
 465      * @param operation expected APPC operation request
 
 467     protected void appcLegacySunnyDay(String policyFile, String onsetFile, String operation) {
 
 468         policyClMgt = createNoficationTopicListener();
 
 469         appcClSink = topics.createListener(APPC_CL_TOPIC, Request.class, APPC_LEGACY_CODER);
 
 471         policy = checkPolicy(policyFile);
 
 473         /* Inject an ONSET event over the DCAE topic */
 
 474         topics.inject(DCAE_TOPIC, onsetFile);
 
 476         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 477         waitForLockAndPermit(policy, policyClMgt);
 
 480          * Ensure that an APPC RESTART request was sent in response to the matching ONSET
 
 482         Request appcreq = appcClSink.await(req -> operation.equals(req.getAction()));
 
 485          * Inject a 400 APPC Response Return over the APPC topic, with appropriate
 
 488         topics.inject(APPC_CL_TOPIC, VFW_APPC_SUCCESS, appcreq.getCommonHeader().getSubRequestId());
 
 490         /* --- Operation Completed --- */
 
 492         waitForOperationSuccess();
 
 494         /* --- Transaction Completed --- */
 
 495         waitForFinalSuccess(policy, policyClMgt);
 
 501      * Rainy day scenario for use cases that use Legacy APPC.
 
 503      * @param policyFile file containing the ToscaPolicy to be loaded
 
 504      * @param onsetFile file containing the ONSET to be injected
 
 505      * @param operation expected APPC operation request
 
 507     protected void appcLegacyRainyDay(String policyFile, String onsetFile, String operation) {
 
 508         policyClMgt = createNoficationTopicListener();
 
 509         appcClSink = topics.createListener(APPC_CL_TOPIC, Request.class, APPC_LEGACY_CODER);
 
 511         policy = checkPolicy(policyFile);
 
 513         /* Inject an ONSET event over the DCAE topic */
 
 514         topics.inject(DCAE_TOPIC, onsetFile);
 
 516         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 517         waitForLockAndPermit(policy, policyClMgt);
 
 520          * Ensure that an APPC RESTART request was sent in response to the matching ONSET
 
 522         Request appcreq = appcClSink.await(req -> operation.equals(req.getAction()));
 
 525          * Inject a 401 APPC Response Return over the APPC topic, with appropriate
 
 528         topics.inject(APPC_CL_TOPIC, VFW_APPC_FAILURE, appcreq.getCommonHeader().getSubRequestId());
 
 530         /* --- Operation Completed --- */
 
 531         waitForOperationFailure();
 
 533         /* --- Transaction Completed --- */
 
 534         waitForFinalFailure(policy, policyClMgt);
 
 540      * Rainy day scenario for use cases that use Legacy APPC. Expected to fail due to
 
 543      * @param policyFile file containing the ToscaPolicy to be loaded
 
 544      * @param onsetFile file containing the ONSET to be injected
 
 545      * @param operation expected APPC operation request
 
 547     protected void appcLegacyRainyDayNoResponse(String policyFile, String onsetFile, String operation) {
 
 548         policyClMgt = createNoficationTopicListener();
 
 549         appcClSink = topics.createListener(APPC_CL_TOPIC, Request.class, APPC_LEGACY_CODER);
 
 551         policy = checkPolicy(policyFile);
 
 553         /* Inject an ONSET event over the DCAE topic */
 
 554         topics.inject(DCAE_TOPIC, onsetFile);
 
 556         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 557         waitForLockAndPermit(policy, policyClMgt);
 
 560          * Ensure that an APPC RESTART request was sent in response to the matching ONSET
 
 562         appcClSink.await(req -> operation.equals(req.getAction()));
 
 565          * Do not inject an APPC Response.
 
 568         /* --- Transaction Completed --- */
 
 569         waitForFinalFailure(policy, policyClMgt);
 
 575      * Sunny day scenario for use cases that use SDNR.
 
 577      * @param policyFile file containing the ToscaPolicy to be loaded
 
 578      * @param onsetFile file containing the ONSET to be injected
 
 579      * @param operation expected SDNR operation request
 
 581     protected void sdnrSunnyDay(String policyFile, String onsetFile,
 
 582                                 String successFile, String operation,
 
 583                                 String requestTopic, String responseTopic) {
 
 584         policyClMgt = createNoficationTopicListener();
 
 585         sdnrClSink = topics.createListener(requestTopic, PciMessage.class, SDNR_CODER);
 
 587         policy = checkPolicy(policyFile);
 
 589         /* Inject an ONSET event over the DCAE topic */
 
 590         topics.inject(DCAE_TOPIC, onsetFile);
 
 592         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 593         waitForLockAndPermit(policy, policyClMgt);
 
 596          * Ensure that an SDNR RESTART request was sent in response to the matching ONSET
 
 598         PciMessage pcireq = sdnrClSink.await(req -> operation.equals(req.getBody().getInput().getAction()));
 
 603         topics.inject(responseTopic, successFile, pcireq.getBody().getInput().getCommonHeader().getSubRequestId());
 
 605         /* --- Operation Completed --- */
 
 607         waitForOperationSuccess();
 
 609         /* --- Transaction Completed --- */
 
 610         waitForFinalSuccess(policy, policyClMgt);
 
 616      * Sunny day scenario for use cases that use an HTTP simulator.
 
 618      * @param policyFile file containing the ToscaPolicy to be loaded
 
 619      * @param onsetFile file containing the ONSET to be injected
 
 621     protected void httpSunnyDay(String policyFile, String onsetFile) {
 
 622         policyClMgt = createNoficationTopicListener();
 
 624         policy = checkPolicy(policyFile);
 
 626         /* Inject an ONSET event over the DCAE topic */
 
 627         topics.inject(DCAE_TOPIC, onsetFile);
 
 629         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 630         waitForLockAndPermit(policy, policyClMgt);
 
 632         /* --- Operation Completed --- */
 
 634         waitForOperationSuccess();
 
 636         /* --- Transaction Completed --- */
 
 637         waitForFinalSuccess(policy, policyClMgt);
 
 643      * Rainy day scenario for use cases that use an HTTP simulator.
 
 645      * @param policyFile file containing the ToscaPolicy to be loaded
 
 646      * @param onsetFile file containing the ONSET to be injected
 
 648     protected void httpRainyDay(String policyFile, String onsetFile) {
 
 649         policyClMgt = createNoficationTopicListener();
 
 651         policy = checkPolicy(policyFile);
 
 653         /* Inject an ONSET event over the DCAE topic */
 
 654         topics.inject(DCAE_TOPIC, onsetFile);
 
 656         /* Wait to acquire a LOCK and a PDP-X PERMIT */
 
 657         waitForLockAndPermit(policy, policyClMgt);
 
 659         /* --- Operation Completed --- */
 
 660         waitForOperationFailure();
 
 662         /* --- Transaction Completed --- */
 
 663         waitForFinalFailure(policy, policyClMgt);
 
 668     protected long getCreateCount() {
 
 669         return ControlLoopEventManager.getCreateCount();
 
 673      * Waits for a OPERATION SUCCESS transaction notification.
 
 675     protected void waitForOperationSuccess() {
 
 676         policyClMgt.await(notif -> notif.getNotification() == ControlLoopNotificationType.OPERATION_SUCCESS);
 
 680      * Waits for a FINAL SUCCESS transaction notification.
 
 682      * @return the FINAL SUCCESS notification
 
 684     protected VirtualControlLoopNotification waitForFinalSuccess(ToscaPolicy policy,
 
 685                     Listener<VirtualControlLoopNotification> policyClMgt) {
 
 687         return this.waitForFinal(policy, policyClMgt, ControlLoopNotificationType.FINAL_SUCCESS);
 
 691      * Waits for a OPERATION FAILURE transaction notification.
 
 693     protected void waitForOperationFailure() {
 
 694         policyClMgt.await(notif -> notif.getNotification() == ControlLoopNotificationType.OPERATION_FAILURE);
 
 698      * Waits for a FINAL FAILURE transaction notification.
 
 700      * @return the FINAL FAILURE notification
 
 702     protected VirtualControlLoopNotification waitForFinalFailure(ToscaPolicy policy,
 
 703                     Listener<VirtualControlLoopNotification> policyClMgt) {
 
 705         return this.waitForFinal(policy, policyClMgt, ControlLoopNotificationType.FINAL_FAILURE);
 
 709      * Waits for notifications for LOCK acquisition and GUARD Permit so that event
 
 710      * processing may proceed.
 
 712     protected abstract void waitForLockAndPermit(ToscaPolicy policy,
 
 713                     Listener<VirtualControlLoopNotification> policyClMgt);
 
 716      * Waits for a FINAL transaction notification.
 
 718      * @param finalType FINAL_xxx type for which to wait
 
 720      * @return the FINAL notification
 
 722     protected abstract VirtualControlLoopNotification waitForFinal(ToscaPolicy policy,
 
 723                     Listener<VirtualControlLoopNotification> policyClMgt, ControlLoopNotificationType finalType);
 
 726      * Returns ToscaPolicy from File.
 
 728      * @param fileName a path name
 
 729      * @return ToscaPolicy
 
 731     protected ToscaPolicy checkPolicy(String fileName) {
 
 733             return Rules.getPolicyFromFile(fileName);
 
 734         } catch (CoderException e) {
 
 735             throw new IllegalArgumentException(fileName, e);
 
 740      * Creates a Coder for PolicyClMgt from StandardCoder.
 
 743     public static class PolicyClMgtCoder extends StandardCoder {
 
 744         public PolicyClMgtCoder() {
 
 745             super(org.onap.policy.controlloop.util.Serialization.gson,
 
 746                             org.onap.policy.controlloop.util.Serialization.gsonPretty);
 
 751      * Returns Listener from createListener based on Coder.
 
 753      * @return the Listener
 
 755     protected Listener<VirtualControlLoopNotification> createNoficationTopicListener() {
 
 756         return topics.createListener(POLICY_CL_MGT_TOPIC, VirtualControlLoopNotification.class, POLICY_CL_MGT_CODER);
 
 760      * Verifies that all locks have been released, waiting a bit, if necessary.
 
 762     private void verifyUnlocked() {
 
 763         Map<String, SimpleLock> locks = getLockMap();
 
 764         if (!MapUtils.isEmpty(locks)) {
 
 765             Awaitility.await().atMost(5, TimeUnit.SECONDS).until(locks::isEmpty);
 
 769     private Map<String, SimpleLock> getLockMap() {
 
 770         Object lockMgr = Whitebox.getInternalState(PolicyEngineConstants.getManager(), "lockManager");
 
 771         if (lockMgr instanceof SimpleLockManager) {
 
 772             return Whitebox.getInternalState(lockMgr, "resource2lock");
 
 775         return Collections.emptyMap();