extend vnf properties model for cds actors 24/126324/6
authorjhh <jorge.hernandez-herrero@att.com>
Thu, 16 Dec 2021 22:15:33 +0000 (16:15 -0600)
committerjhh <jorge.hernandez-herrero@att.com>
Fri, 7 Jan 2022 23:48:21 +0000 (17:48 -0600)
CDS operations for VNFs default to vFirewall-like behavior for
AAI queries where vserver and target resources are required.

The properties refactoring allows for enriched
vnf-id behavior (similar to vCPE) and provides some leg work
for other extensions.

Fix dmaap simulator sporadic test errors in junits

Issue-ID: POLICY-3865
Signed-off-by: jhh <jorge.hernandez-herrero@att.com>
Change-Id: Iad35e4f2a9e10b1a49818c91b9fec7520e844b8c

12 files changed:
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfProperties.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfProperties.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationProperties.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfProperties.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java
models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfPropertiesTest.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfPropertiesTest.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPropertiesTest.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfPropertiesTest.java [new file with mode: 0644]
models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
models-interactions/model-simulators/src/test/java/org/onap/policy/simulators/DmaapSimulatorTest.java

index 87eaa95..87d9403 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  * Copyright (C) 2020 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,27 +26,22 @@ import com.google.protobuf.Struct;
 import com.google.protobuf.util.JsonFormat;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 import lombok.Getter;
-import org.onap.aai.domain.yang.GenericVnf;
-import org.onap.aai.domain.yang.ServiceInstance;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
+import org.onap.policy.controlloop.actor.cds.properties.GrpcOperationProperties;
 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
-import org.onap.policy.controlloop.actorserviceprovider.TargetType;
-import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
@@ -60,10 +55,6 @@ public class GrpcOperation extends OperationPartial {
 
     public static final String NAME = "any";
 
-    private static final String AAI_PNF_PREFIX = "pnf.";
-    private static final String AAI_VNF_ID_KEY = "generic-vnf.vnf-id";
-    private static final String AAI_SERVICE_INSTANCE_ID_KEY = "service-instance.service-instance-id";
-
     private CdsProcessorGrpcClient client;
 
     /**
@@ -72,24 +63,9 @@ public class GrpcOperation extends OperationPartial {
     private final GrpcConfig config;
 
     /**
-     * Function to convert the A&AI data associated with the target type.
+     * Operation properties.
      */
-    private final Supplier<Map<String, String>> aaiConverter;
-
-
-    // @formatter:off
-    private static final List<String> PNF_PROPERTY_NAMES = List.of(
-                            OperationProperties.AAI_PNF,
-                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
-                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
-
-
-    private static final List<String> VNF_PROPERTY_NAMES = List.of(
-                            OperationProperties.AAI_RESOURCE_VNF,
-                            OperationProperties.AAI_SERVICE,
-                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
-                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
-    // @formatter:on
+    private final GrpcOperationProperties opProperties;
 
     /**
      * Constructs the object.
@@ -100,17 +76,12 @@ public class GrpcOperation extends OperationPartial {
     public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) {
         super(params, config, Collections.emptyList());
         this.config = config;
-
-        if (TargetType.PNF.equals(params.getTargetType())) {
-            aaiConverter = this::convertPnfToAaiProperties;
-        } else {
-            aaiConverter = this::convertVnfToAaiProperties;
-        }
+        this.opProperties = GrpcOperationProperties.build(params);
     }
 
     @Override
     public List<String> getPropertyNames() {
-        return (TargetType.PNF.equals(params.getTargetType()) ? PNF_PROPERTY_NAMES : VNF_PROPERTY_NAMES);
+        return this.opProperties.getPropertyNames();
     }
 
     /**
@@ -121,62 +92,6 @@ public class GrpcOperation extends OperationPartial {
         return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
     }
 
-    /**
-     * Converts the A&AI PNF data to a map suitable for passing via the "aaiProperties"
-     * field in the CDS request.
-     *
-     * @return a map of the PNF data
-     */
-    private Map<String, String> convertPnfToAaiProperties() {
-        Map<String, String> result = getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
-        if (result != null) {
-            return result;
-        }
-
-        // convert PNF data to a Map
-        Object pnfData = getRequiredProperty(OperationProperties.AAI_PNF, "PNF");
-        Map<String, Object> source = Util.translateToMap(getFullName(), pnfData);
-
-        result = new LinkedHashMap<>();
-
-        for (Entry<String, Object> ent : source.entrySet()) {
-            result.put(AAI_PNF_PREFIX + ent.getKey(), ent.getValue().toString());
-        }
-
-        return result;
-    }
-
-    /**
-     * Converts the A&AI VNF data to a map suitable for passing via the
-     * "aaiProperties" field in the CDS request.
-     *
-     * @return a map of the VNF data
-     */
-    private Map<String, String> convertVnfToAaiProperties() {
-        Map<String, String> result = getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
-        if (result != null) {
-            return result;
-        }
-
-        result = new LinkedHashMap<>();
-
-        result.put(AAI_SERVICE_INSTANCE_ID_KEY, getServiceInstanceId());
-        result.put(AAI_VNF_ID_KEY, getVnfId());
-
-        return result;
-    }
-
-    protected String getServiceInstanceId() {
-        var serviceInstance = (ServiceInstance) getRequiredProperty(OperationProperties.AAI_SERVICE,
-                        "Target service instance");
-        return serviceInstance.getServiceInstanceId();
-    }
-
-    protected String getVnfId() {
-        var genericVnf = (GenericVnf) getRequiredProperty(OperationProperties.AAI_RESOURCE_VNF, "Target generic vnf");
-        return genericVnf.getVnfId();
-    }
-
     @Override
     public void generateSubRequestId(int attempt) {
         setSubRequestId("0");
@@ -251,7 +166,7 @@ public class GrpcOperation extends OperationPartial {
         //
         // Note: that is a future enhancement. For now, the actor is hard-coded to
         // use the A&AI query result specific to the target type
-        request.setAaiProperties(aaiConverter.get());
+        request.setAaiProperties(opProperties.convertToAaiProperties(this));
 
         // Inject any additional event parameters that may be present in the onset event
         Map<String, String> additionalParams = getProperty(OperationProperties.EVENT_ADDITIONAL_PARAMS);
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfProperties.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfProperties.java
new file mode 100644 (file)
index 0000000..5fcd307
--- /dev/null
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.onap.aai.domain.yang.GenericVnf;
+import org.onap.aai.domain.yang.ServiceInstance;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+
+/**
+ *  VNF with target entities properties.
+ */
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class GrcpOperationResourceVnfProperties extends GrpcOperationProperties {
+
+    // @formatter:off
+    protected static final List<String> VNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_RESOURCE_VNF,
+                            OperationProperties.AAI_SERVICE,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+    // @formatter:on
+
+    @Override
+    public List<String> getPropertyNames() {
+        return VNF_PROPERTY_NAMES;
+    }
+
+    @Override
+    public Map<String, String> convertToAaiProperties(GrpcOperation operation) {
+        Map<String, String> result = operation.getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+        if (result != null) {
+            return result;
+        }
+
+        result = new LinkedHashMap<>();
+        result.put(AAI_SERVICE_INSTANCE_ID_KEY, getServiceInstanceId(operation));
+        result.put(AAI_VNF_ID_KEY, getVnfId(operation));
+        return result;
+    }
+
+    /**
+     * Get the service instance id from the required operation property.
+     */
+    public String getServiceInstanceId(GrpcOperation operation) {
+        var serviceInstance =
+            (ServiceInstance) operation.getRequiredProperty(OperationProperties.AAI_SERVICE,
+                        "Target service instance");
+        return serviceInstance.getServiceInstanceId();
+    }
+
+    /**
+     * Get the vnf id from the required property.
+     */
+    public String getVnfId(GrpcOperation operation) {
+        var genericVnf =
+            (GenericVnf) operation.getRequiredProperty(OperationProperties.AAI_RESOURCE_VNF, "Target generic vnf");
+        return genericVnf.getVnfId();
+    }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfProperties.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfProperties.java
new file mode 100644 (file)
index 0000000..4d9316a
--- /dev/null
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+
+/**
+ * GRPC Operation with PNF properties.
+ */
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class GrpcOperationPnfProperties extends GrpcOperationProperties {
+
+    // @formatter:off
+    protected static final List<String> PNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_PNF,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+    // @formatter:on
+
+    @Override
+    public List<String> getPropertyNames() {
+        return PNF_PROPERTY_NAMES;
+    }
+
+    @Override
+    public Map<String, String> convertToAaiProperties(GrpcOperation operation) {
+        Map<String, String> result = operation.getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+        if (result != null) {
+            return result;
+        }
+
+        Object pnfData = operation.getRequiredProperty(OperationProperties.AAI_PNF, "PNF");
+        Map<String, Object> source = Util.translateToMap(operation.getFullName(), pnfData);
+
+        result = new LinkedHashMap<>();
+
+        for (Map.Entry<String, Object> ent : source.entrySet()) {
+            result.put(AAI_PNF_PREFIX + ent.getKey(), ent.getValue().toString());
+        }
+
+        return result;
+    }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationProperties.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationProperties.java
new file mode 100644 (file)
index 0000000..fd49ea5
--- /dev/null
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+/**
+ * GRPC Operation Base Class.
+ */
+@Data
+public abstract class GrpcOperationProperties {
+    protected static final String AAI_PNF_PREFIX = "pnf.";
+
+    /**
+     * AAI VNF Identifier.
+     */
+    public static final String AAI_VNF_ID_KEY = "generic-vnf.vnf-id";
+
+    /**
+     * AAI Service Instance Identifier.
+     */
+    public static final String AAI_SERVICE_INSTANCE_ID_KEY = "service-instance.service-instance-id";
+
+    private static final Logger logger = LoggerFactory.getLogger(GrpcOperationProperties.class);
+
+    /**
+     * Build the appropriate GrpcOperation object depending on the target type.
+     */
+    public static GrpcOperationProperties build(ControlLoopOperationParams params) {
+        if (TargetType.PNF.equals(params.getTargetType())) {
+            return new GrpcOperationPnfProperties();
+        }
+
+        // assume VNF processing for backwards compatibility with istanbul
+
+        if (!TargetType.VNF.equals(params.getTargetType())) {
+            logger.warn("Unexpected target type, build VNF-like operation properties");
+        }
+
+        return CollectionUtils.isEmpty(params.getTargetEntityIds())
+                ? new GrpcOperationTargetVnfProperties()
+                : new GrcpOperationResourceVnfProperties();
+    }
+
+    /**
+     * Get the property names.
+     */
+    public abstract List<String> getPropertyNames();
+
+    /**
+     * Convert to the AAI properties.
+     */
+    public abstract Map<String, String> convertToAaiProperties(GrpcOperation operation);
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfProperties.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfProperties.java
new file mode 100644 (file)
index 0000000..6c5a97d
--- /dev/null
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+
+/**
+ * GRPC Operation with Target VNF properties.
+ */
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class GrpcOperationTargetVnfProperties extends GrpcOperationProperties {
+
+    // @formatter:off
+    protected static final List<String> VNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_TARGET_ENTITY,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+    // @formatter:on
+
+    @Override
+    public List<String> getPropertyNames() {
+        return VNF_PROPERTY_NAMES;
+    }
+
+    @Override
+    public Map<String, String> convertToAaiProperties(GrpcOperation operation) {
+        Map<String, String> result = operation.getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+        if (result != null) {
+            return result;
+        }
+
+        result = new LinkedHashMap<>();
+        result.put(AAI_VNF_ID_KEY,
+            operation.getRequiredProperty(OperationProperties.AAI_TARGET_ENTITY, "vnf id"));
+        return result;
+    }
+}
index 0809291..cae048b 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  * Copyright (C) 2020 Bell Canada. All rights reserved.
- * Modifications Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ package org.onap.policy.controlloop.actor.cds;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -42,7 +43,6 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.aai.domain.yang.GenericVnf;
 import org.onap.aai.domain.yang.ServiceInstance;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
-import org.onap.policy.cds.client.CdsProcessorGrpcClient;
 import org.onap.policy.cds.properties.CdsServerProperties;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
@@ -50,6 +50,7 @@ import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
+import org.onap.policy.controlloop.actor.cds.properties.GrpcOperationProperties;
 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
@@ -78,7 +79,6 @@ public class GrpcOperationTest {
     private static CdsSimulator sim;
 
     @Mock
-    private CdsProcessorGrpcClient cdsClient;
     private CdsServerProperties cdsProps;
     private PseudoExecutor executor;
     private Map<String, String> targetEntityIds;
@@ -100,7 +100,7 @@ public class GrpcOperationTest {
      * Sets up the fields.
      */
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         // Setup the CDS properties
         cdsProps = new CdsServerProperties();
         cdsProps.setHost("10.10.10.10");
@@ -150,19 +150,33 @@ public class GrpcOperationTest {
 
     @Test
     public void testGetPropertyNames() {
+        /*
+         * check VNF case with target entities
+         */
+        params = params.toBuilder().targetType(TargetType.VNF).targetEntityIds(targetEntityIds).build();
+        operation = new GrpcOperation(params, config);
+
+        // @formatter:off
+        assertThat(operation.getPropertyNames()).isEqualTo(
+                List.of(
+                        OperationProperties.AAI_RESOURCE_VNF,
+                        OperationProperties.AAI_SERVICE,
+                        OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                        OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES));
+        // @formatter:on
 
         /*
-         * check VNF case
+         * check VNF case with no target entities
          */
+        params = params.toBuilder().targetEntityIds(null).build();
         operation = new GrpcOperation(params, config);
 
         // @formatter:off
         assertThat(operation.getPropertyNames()).isEqualTo(
-                        List.of(
-                            OperationProperties.AAI_RESOURCE_VNF,
-                            OperationProperties.AAI_SERVICE,
-                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
-                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES));
+                List.of(
+                        OperationProperties.AAI_TARGET_ENTITY,
+                        OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                        OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES));
         // @formatter:on
 
         /*
@@ -182,47 +196,82 @@ public class GrpcOperationTest {
 
     @Test
     public void testGetServiceInstanceId() {
+        params = params.toBuilder().targetType(TargetType.VNF).targetEntityIds(targetEntityIds).build();
         operation = new GrpcOperation(params, config);
         loadVnfData();
-        assertEquals(MY_SVC_ID, operation.getServiceInstanceId());
+        assertEquals(MY_SVC_ID,
+            operation.getOpProperties()
+                    .convertToAaiProperties(operation)
+                    .get(GrpcOperationProperties.AAI_SERVICE_INSTANCE_ID_KEY));
     }
 
     @Test
     public void testGetVnfId() {
+        params = params.toBuilder().targetType(TargetType.VNF).targetEntityIds(targetEntityIds).build();
         operation = new GrpcOperation(params, config);
         loadVnfData();
-        assertEquals(MY_VNF, operation.getVnfId());
+        assertEquals(MY_VNF,
+            operation.getOpProperties()
+                    .convertToAaiProperties(operation)
+                    .get(GrpcOperationProperties.AAI_VNF_ID_KEY));
+
+        params = params.toBuilder().targetEntityIds(null).build();
+        operation = new GrpcOperation(params, config);
+        assertThatIllegalStateException().isThrownBy(()
+            -> operation.getOpProperties()
+                       .convertToAaiProperties(operation)
+                       .get(GrpcOperationProperties.AAI_VNF_ID_KEY));
+
+        operation.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_VNF);
+        assertEquals(MY_VNF,
+                operation.getOpProperties()
+                        .convertToAaiProperties(operation)
+                        .get(GrpcOperationProperties.AAI_VNF_ID_KEY));
+        operation.setProperty(OperationProperties.AAI_TARGET_ENTITY, null);
     }
 
     @Test
-    public void testStartOperationAsync() throws Exception {
-        verifyOperation(TargetType.VNF, this::loadVnfData);
+    public void testStartOperationAsync() {
+        ControlLoopOperationParams clop =
+                ControlLoopOperationParams.builder().actor(CdsActorConstants.CDS_ACTOR)
+                        .operation(GrpcOperation.NAME)
+                        .requestId(REQUEST_ID)
+                        .actorService(new ActorService())
+                        .targetType(TargetType.VNF)
+                        .build();
+
+        verifyOperation(clop, () -> operation.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_VNF));
+        verifyOperation(clop.toBuilder().targetEntityIds(targetEntityIds).build(), this::loadVnfData);
     }
 
     /**
      * Tests startOperationAsync() when the target type is PNF.
      */
     @Test
-    public void testStartOperationAsyncPnf() throws Exception {
-        verifyOperation(TargetType.PNF, this::loadPnfData);
+    public void testStartOperationAsyncPnf() {
+        ControlLoopOperationParams clop =
+                ControlLoopOperationParams.builder().actor(CdsActorConstants.CDS_ACTOR)
+                        .operation(GrpcOperation.NAME)
+                        .requestId(REQUEST_ID)
+                        .actorService(new ActorService())
+                        .targetType(TargetType.PNF)
+                        .build();
+
+        verifyOperation(clop, this::loadPnfData);
     }
 
     @Test
-    public void testStartOperationAsyncError() throws Exception {
+    public void testStartOperationAsyncError() {
         operation = new GrpcOperation(params, config);
         assertThatIllegalArgumentException()
                         .isThrownBy(() -> operation.startOperationAsync(1, params.makeOutcome()));
     }
 
-    private void verifyOperation(TargetType targetType, Runnable loader) {
-
+    private void verifyOperation(ControlLoopOperationParams clop, Runnable loader) {
         Map<String, Object> payloadMap = Map.of(CdsActorConstants.KEY_CBA_NAME, CDS_BLUEPRINT_NAME,
                         CdsActorConstants.KEY_CBA_VERSION, CDS_BLUEPRINT_VERSION, "data",
                         "{\"mapInfo\":{\"key\":\"val\"},\"arrayInfo\":[\"one\",\"two\"],\"paramInfo\":\"val\"}");
-
-        ControlLoopOperationParams params = ControlLoopOperationParams.builder().actor(CdsActorConstants.CDS_ACTOR)
-                        .operation(GrpcOperation.NAME).requestId(REQUEST_ID).actorService(new ActorService())
-                        .targetType(targetType).payload(payloadMap).build();
+        params = clop.toBuilder().payload(payloadMap).build();
 
         GrpcConfig config = new GrpcConfig(executor, cdsProps);
         operation = new GrpcOperation(params, config);
diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfPropertiesTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrcpOperationResourceVnfPropertiesTest.java
new file mode 100644 (file)
index 0000000..44ebc12
--- /dev/null
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Test;
+import org.onap.aai.domain.yang.GenericVnf;
+import org.onap.aai.domain.yang.ServiceInstance;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.utils.time.PseudoExecutor;
+import org.onap.policy.controlloop.actor.cds.GrpcConfig;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+public class GrcpOperationResourceVnfPropertiesTest {
+
+    @Test
+    public void getPropertyNames() {
+        assertEquals(GrcpOperationResourceVnfProperties.VNF_PROPERTY_NAMES,
+                new GrcpOperationResourceVnfProperties().getPropertyNames());
+    }
+
+    @Test
+    public void convertToAaiProperties() {
+        ControlLoopOperationParams params =
+                ControlLoopOperationParams.builder()
+                        .targetType(TargetType.VNF)
+                        .build();
+
+        GrpcOperation operation =
+                new GrpcOperation(params, new GrpcConfig(new PseudoExecutor(), new CdsServerProperties()));
+
+        GrcpOperationResourceVnfProperties properties = new GrcpOperationResourceVnfProperties();
+        assertThatIllegalStateException()
+                .isThrownBy(() -> properties.convertToAaiProperties(operation));
+
+        GenericVnf genvnf = new GenericVnf();
+        genvnf.setVnfId("v");
+        operation.setProperty(OperationProperties.AAI_RESOURCE_VNF, genvnf);
+
+        ServiceInstance serviceInstance = new ServiceInstance();
+        serviceInstance.setServiceInstanceId("s");
+        operation.setProperty(OperationProperties.AAI_SERVICE, serviceInstance);
+
+        assertEquals("s",
+                properties.convertToAaiProperties(operation)
+                        .get(GrcpOperationResourceVnfProperties.AAI_SERVICE_INSTANCE_ID_KEY));
+
+        assertEquals("v",
+                properties.convertToAaiProperties(operation)
+                        .get(GrcpOperationResourceVnfProperties.AAI_VNF_ID_KEY));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES, Collections.emptyMap());
+
+        assertNull(properties.convertToAaiProperties(operation)
+                           .get(GrcpOperationResourceVnfProperties.AAI_SERVICE_INSTANCE_ID_KEY));
+
+        assertNull(properties.convertToAaiProperties(operation)
+                           .get(GrcpOperationResourceVnfProperties.AAI_VNF_ID_KEY));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES,
+                Map.of("aa", "AA"));
+        assertEquals("AA",
+                properties.convertToAaiProperties(operation)
+                        .get("aa"));
+    }
+}
\ No newline at end of file
diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfPropertiesTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPnfPropertiesTest.java
new file mode 100644 (file)
index 0000000..828eded
--- /dev/null
@@ -0,0 +1,85 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Test;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.common.utils.time.PseudoExecutor;
+import org.onap.policy.controlloop.actor.cds.GrpcConfig;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+public class GrpcOperationPnfPropertiesTest {
+    private static final Coder coder = new StandardCoder();
+
+    @Test
+    public void getPropertyNames() {
+        assertEquals(GrpcOperationPnfProperties.PNF_PROPERTY_NAMES,
+                new GrpcOperationPnfProperties().getPropertyNames());
+    }
+
+    @Test
+    public void convertToAaiProperties() throws CoderException {
+        ControlLoopOperationParams params =
+                ControlLoopOperationParams.builder()
+                        .targetType(TargetType.PNF)
+                        .build();
+
+        GrpcOperation operation =
+                new GrpcOperation(params, new GrpcConfig(new PseudoExecutor(), new CdsServerProperties()));
+
+        GrpcOperationPnfProperties properties = new GrpcOperationPnfProperties();
+        assertThatIllegalStateException()
+                .isThrownBy(() -> properties.convertToAaiProperties(operation));
+
+        String pnf = "{'dataA': 'valueA', 'dataB': 'valueB'}".replace('\'', '"');
+        StandardCoderObject sco = coder.decode(pnf, StandardCoderObject.class);
+        operation.setProperty(OperationProperties.AAI_PNF, sco);
+
+        assertEquals("valueA",
+                properties.convertToAaiProperties(operation)
+                        .get(GrpcOperationPnfProperties.AAI_PNF_PREFIX + "dataA"));
+
+        assertEquals("valueB",
+                properties.convertToAaiProperties(operation)
+                        .get(GrpcOperationPnfProperties.AAI_PNF_PREFIX + "dataB"));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES, Collections.emptyMap());
+        assertNull(properties.convertToAaiProperties(operation)
+                        .get(GrpcOperationPnfProperties.AAI_PNF_PREFIX + "dataA"));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES,
+                Map.of("dataC", "valueC"));
+        assertEquals("valueC",
+                properties.convertToAaiProperties(operation)
+                        .get("dataC"));
+    }
+}
\ No newline at end of file
diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPropertiesTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationPropertiesTest.java
new file mode 100644 (file)
index 0000000..514df4e
--- /dev/null
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Test;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+public class GrpcOperationPropertiesTest {
+
+    @Test
+    public void build() {
+        ControlLoopOperationParams params = ControlLoopOperationParams.builder().targetType(TargetType.VNF).build();
+        assertEquals(GrpcOperationTargetVnfProperties.VNF_PROPERTY_NAMES,
+                GrpcOperationProperties.build(params).getPropertyNames());
+
+        Map<String, String> targetEntityIds = new HashMap<>();
+        targetEntityIds.put(ControlLoopOperationParams.PARAMS_ENTITY_RESOURCEID, "R");
+
+        params = ControlLoopOperationParams.builder()
+                         .targetType(TargetType.VNF)
+                         .targetEntityIds(targetEntityIds)
+                         .build();
+        assertEquals(GrcpOperationResourceVnfProperties.VNF_PROPERTY_NAMES,
+                GrpcOperationProperties.build(params).getPropertyNames());
+
+        params = ControlLoopOperationParams.builder().targetType(TargetType.PNF).build();
+        assertEquals(GrpcOperationPnfProperties.PNF_PROPERTY_NAMES,
+                GrpcOperationProperties.build(params).getPropertyNames());
+
+        params = ControlLoopOperationParams.builder()
+                         .targetType(TargetType.VFC)
+                         .targetEntityIds(targetEntityIds)
+                         .build();
+        assertEquals(GrcpOperationResourceVnfProperties.VNF_PROPERTY_NAMES,
+                GrpcOperationProperties.build(params).getPropertyNames());
+    }
+}
\ No newline at end of file
diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfPropertiesTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/properties/GrpcOperationTargetVnfPropertiesTest.java
new file mode 100644 (file)
index 0000000..798a96b
--- /dev/null
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds.properties;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.Test;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.utils.time.PseudoExecutor;
+import org.onap.policy.controlloop.actor.cds.GrpcConfig;
+import org.onap.policy.controlloop.actor.cds.GrpcOperation;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+public class GrpcOperationTargetVnfPropertiesTest {
+
+    @Test
+    public void getPropertyNames() {
+        assertEquals(GrpcOperationTargetVnfProperties.VNF_PROPERTY_NAMES,
+                new GrpcOperationTargetVnfProperties().getPropertyNames());
+    }
+
+    @Test
+    public void convertToAaiProperties() {
+        ControlLoopOperationParams params =
+                ControlLoopOperationParams.builder()
+                        .targetType(TargetType.VNF)
+                        .build();
+
+        GrpcOperation operation =
+                new GrpcOperation(params, new GrpcConfig(new PseudoExecutor(), new CdsServerProperties()));
+
+        GrpcOperationTargetVnfProperties properties = new GrpcOperationTargetVnfProperties();
+        assertThatIllegalStateException()
+                .isThrownBy(() -> properties.convertToAaiProperties(operation));
+
+        operation.setProperty(OperationProperties.AAI_TARGET_ENTITY, "v");
+        assertEquals("v",
+                properties.convertToAaiProperties(operation)
+                        .get(GrpcOperationTargetVnfProperties.AAI_VNF_ID_KEY));
+
+        assertNull(properties.convertToAaiProperties(operation)
+                         .get(GrcpOperationResourceVnfProperties.AAI_SERVICE_INSTANCE_ID_KEY));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES, Collections.emptyMap());
+        assertNull(properties.convertToAaiProperties(operation)
+                           .get(GrpcOperationTargetVnfProperties.AAI_VNF_ID_KEY));
+
+        operation.setProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES,
+                Map.of("aa", "AA"));
+        assertEquals("AA",
+                properties.convertToAaiProperties(operation)
+                        .get("aa"));
+    }
+}
\ No newline at end of file
index 4f2ab02..3281ddb 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -165,7 +165,7 @@ public abstract class OperationPartial implements Operation {
      * @return the property value
      */
     @SuppressWarnings("unchecked")
-    protected <T> T getRequiredProperty(String name, String propertyType) {
+    public <T> T getRequiredProperty(String name, String propertyType) {
         T value = (T) properties.get(name);
         if (value == null) {
             throw new IllegalStateException("missing " + propertyType);
index 50e9bad..fab041f 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2022 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
 import org.onap.policy.common.utils.coder.StandardCoder;
 
 public class DmaapSimulatorTest {
-    private static final int MAX_WAIT_SEC = 2;
+    private static final int MAX_WAIT_SEC = 5;
     private static final String TOPIC = "MY-TOPIC";
 
     /**
@@ -86,9 +86,9 @@ public class DmaapSimulatorTest {
 
         DmaapTopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(TOPIC);
         sink.send("hello");
-        sink.send("world");
-
         assertEquals("hello", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+
+        sink.send("world");
         assertEquals("world", queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
     }
 }