Merge "GRPC Client impl to send process message to CDS blueprint-processor endpoint"
authorJorge Hernandez <jorge.hernandez-herrero@att.com>
Wed, 12 Jun 2019 18:28:59 +0000 (18:28 +0000)
committerGerrit Code Review <gerrit@onap.org>
Wed, 12 Jun 2019 18:28:59 +0000 (18:28 +0000)
models-interactions/model-impl/cds/pom.xml [new file with mode: 0644]
models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java [new file with mode: 0644]
models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java [new file with mode: 0644]
models-interactions/model-impl/pom.xml

diff --git a/models-interactions/model-impl/cds/pom.xml b/models-interactions/model-impl/cds/pom.xml
new file mode 100644 (file)
index 0000000..9e3b55b
--- /dev/null
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+  Copyright (C) 2019 Bell Canada.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>model-impl</artifactId>
+    <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
+    <version>2.1.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>cds</artifactId>
+  <name>${project.artifactId}</name>
+  <description>gRPC client implementation to send process message to CDS blueprint processor gRPC endpoint.</description>
+
+  <properties>
+    <grpc.version>1.17.1</grpc.version>
+    <protobuf.version>3.6.1</protobuf.version>
+    <grpc.netty.version>4.1.30.Final</grpc.netty.version>
+    <ccsdk.version>0.4.4</ccsdk.version>
+  </properties>
+
+  <dependencies>
+    <!-- CDS dependencies -->
+    <dependency>
+      <groupId>org.onap.ccsdk.cds.components</groupId>
+      <artifactId>proto-definition</artifactId>
+      <version>${ccsdk.version}</version>
+    </dependency>
+
+    <!-- protobuf dependencies -->
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+
+    <!-- gRPC dependencies -->
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <version>${grpc.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+      <version>${grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+      <version>${grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <version>${grpc.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Policy dependencies -->
+    <dependency>
+      <groupId>org.onap.policy.common</groupId>
+      <artifactId>common-parameters</artifactId>
+      <version>${policy.common.version}</version>
+    </dependency>
+
+    <!-- junit dependencies -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.13.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java
new file mode 100644 (file)
index 0000000..c07c559
--- /dev/null
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+
+/**
+ * <p>
+ * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle
+ * the received messages appropriately, it needs to implement {@link CdsProcessorListener}.
+ * </p>
+ *
+ * <p>Here is a sample implementation of a listener:
+ * <pre>
+ * new CdsProcessorListener {
+ *
+ *     &#64;Override
+ *     public void onMessage(ExecutionServiceOutput message) {
+ *         log.info("Received notification from CDS: {}", message);
+ *     }
+ *
+ *     &#64;Override
+ *     public void onError(Throwable throwable) {
+ *         Status status = Status.fromThrowable(throwable);
+ *         log.error("Failed processing blueprint {}", status, throwable);
+ *     }
+ * }
+ * </pre>
+ * </p>
+ */
+public interface CdsProcessorListener {
+
+    /**
+     * Implements the workflow upon receiving the message from the server side.
+     *
+     * <p>Note that the CDS client-server communication is configured to use a streaming approach, which means when
+     * client
+     * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence,
+     * it is up to the implementation of this method to process the received message using {@link
+     * ExecutionServiceOutput#getStatus()#getEventType()}</p>
+     *
+     * @param message ExecutionServiceOutput received by the CDS grpc server
+     */
+    void onMessage(ExecutionServiceOutput message);
+
+    /**
+     * Implements the workflow when an error is received from the server side.
+     *
+     * @param throwable Throwable object received from CDS grpc server upon error
+     */
+    void onError(Throwable throwable);
+
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java
new file mode 100644 (file)
index 0000000..3957fe5
--- /dev/null
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+/**
+ * An interceptor to insert the client authHeader.
+ *
+ * <p>The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization
+ * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.</p>
+ *
+ * <p>On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded
+ * username:password as its value.
+ * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract
+ * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.</p>
+ */
+public class BasicAuthClientHeaderInterceptor implements ClientInterceptor {
+
+    static final String BASIC_AUTH_HEADER_KEY = "Authorization";
+    private CdsServerProperties props;
+
+    BasicAuthClientHeaderInterceptor(CdsServerProperties props) {
+        this.props = props;
+    }
+
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+        CallOptions callOptions, Channel channel) {
+        Key<String> authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+        return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+            @Override
+            public void start(Listener<RespT> responseListener, Metadata headers) {
+                headers.put(authHeader, props.getBasicAuth());
+                super.start(responseListener, headers);
+            }
+        };
+    }
+}
+
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java
new file mode 100644 (file)
index 0000000..b8ec7ac
--- /dev/null
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import com.google.common.base.Preconditions;
+import io.grpc.ManagedChannel;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.internal.PickFirstLoadBalancerProvider;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use
+ * a streaming approach, which means the client sends an event to which the server can reply with multiple
+ * sub-responses, until full completion of the processing.
+ * </p>
+ */
+public class CdsProcessorGrpcClient implements AutoCloseable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class);
+
+    private ManagedChannel channel;
+    private CdsProcessorHandler handler;
+
+    /**
+     * Constructor, create a CDS processor gRPC client.
+     *
+     * @param listener the listener to listen on
+     */
+    public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) {
+        final GroupValidationResult validationResult = props.validate();
+        Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server "
+            + "properties: " + validationResult.getResult());
+
+        this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort())
+            .nameResolverFactory(new DnsNameResolverProvider())
+            .loadBalancerFactory(new PickFirstLoadBalancerProvider())
+            .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build();
+        this.handler = new CdsProcessorHandler(listener);
+        LOGGER.info("CdsProcessorListener started");
+    }
+
+    CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) {
+        this.channel = channel;
+        this.handler = handler;
+    }
+
+    /**
+     * Sends a request to the CDS backend micro-service.
+     *
+     * <p>The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The
+     * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter
+     * will decrement.</p>
+     *
+     * <p>It is the user responsibility to close the client.</p>
+     *
+     * @param input request to send
+     * @return CountDownLatch instance that can be use to #await for completeness of processing
+     */
+    public CountDownLatch sendRequest(ExecutionServiceInput input) {
+        return handler.process(input, channel);
+    }
+
+    @Override
+    public void close() {
+        if (channel != null) {
+            channel.shutdown();
+        }
+        LOGGER.info("CdsProcessorListener stopped");
+    }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java
new file mode 100644 (file)
index 0000000..9dd249c
--- /dev/null
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CdsProcessorHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class);
+
+    private CdsProcessorListener listener;
+
+    CdsProcessorHandler(final CdsProcessorListener listener) {
+        this.listener = listener;
+    }
+
+    CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
+        final ActionIdentifiers header = request.getActionIdentifiers();
+        LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+            header.getBlueprintName(), header.getBlueprintVersion());
+
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+        final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
+        final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
+            @Override
+            public void onNext(ExecutionServiceOutput output) {
+                listener.onMessage(output);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                listener.onError(throwable);
+                finishLatch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+                    header.getBlueprintName(), header.getBlueprintVersion());
+                finishLatch.countDown();
+            }
+        };
+
+        final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
+        try {
+            // Send the message to CDS backend for processing
+            requestObserver.onNext(request);
+            // Mark the end of requests
+            requestObserver.onCompleted();
+        } catch (RuntimeException e) {
+            requestObserver.onError(e);
+        }
+        return finishLatch;
+    }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java
new file mode 100644 (file)
index 0000000..94a336b
--- /dev/null
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.properties;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.onap.policy.common.parameters.ParameterGroup;
+import org.onap.policy.common.parameters.ParameterRuntimeException;
+import org.onap.policy.common.parameters.annotations.Max;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+@Getter
+@Setter
+@ToString
+public class CdsServerProperties implements ParameterGroup {
+
+    // Port range constants
+    private static final int MIN_USER_PORT = 1024;
+    private static final int MAX_USER_PORT = 65535;
+
+    private static final String INVALID_PROP = "Invalid CDS property: ";
+    private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties";
+
+    // CDS carrier properties
+    @Min(value = 1)
+    private int timeout;
+
+    @Min(value = MIN_USER_PORT)
+    @Max(value = MAX_USER_PORT)
+    private int port;
+
+    @NotNull
+    private String host;
+
+    @NotNull
+    private String username;
+
+    @NotNull
+    private String password;
+
+
+    @Override
+    public String getName() {
+        return SERVER_PROPERTIES_TYPE;
+    }
+
+    @Override
+    public void setName(final String name) {
+        throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName());
+    }
+
+    @Override
+    public GroupValidationResult validate() {
+        return new GroupValidationResult(this);
+    }
+
+    /**
+     * Generate base64-encoded Authorization header from username and password.
+     *
+     * @return Base64 encoded string
+     */
+    public String getBasicAuth() {
+        return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword())
+            .getBytes(StandardCharsets.UTF_8));
+    }
+}
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java
new file mode 100644 (file)
index 0000000..6dfd70d
--- /dev/null
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import io.grpc.Status;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used as a helper for the gRPC client unit test.
+ */
+public class TestCdsProcessorListenerImpl implements CdsProcessorListener {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class);
+
+    /**
+     * Used to verify/inspect message received from server.
+     */
+    @Override
+    public void onMessage(final ExecutionServiceOutput message) {
+        LOGGER.info("Received notification from CDS: {}", message);
+    }
+
+    /**
+     * Used to verify/inspect error received from server.
+     */
+    @Override
+    public void onError(final Throwable throwable) {
+        Status status = Status.fromThrowable(throwable);
+        LOGGER.error("Failed processing blueprint {}", status, throwable);
+    }
+}
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java
new file mode 100644 (file)
index 0000000..3b6ad7d
--- /dev/null
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class BasicAuthClientHeaderInterceptorTest {
+
+    // Generate a unique in-process server name.
+    private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+    private static final String CREDS = "test";
+
+    // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class,
+        delegatesTo(new TestServerInterceptor()));
+
+    private final CdsServerProperties props = new CdsServerProperties();
+
+    private ManagedChannel channel;
+
+    /**
+     * Setup the test.
+     *
+     * @throws IOException on failure to register the test grpc server for graceful shutdown
+     */
+    @Before
+    public void setUp() throws IOException {
+        // Setup the CDS properties
+        props.setHost(SERVER_NAME);
+        props.setPort(2000);
+        props.setUsername(CREDS);
+        props.setPassword(CREDS);
+        props.setTimeout(60);
+
+        // Implement the test gRPC server
+        BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {};
+
+        // Create a server, add service, start, and register for automatic graceful shutdown.
+        grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor()
+            .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build()
+            .start());
+
+        // Create a client channel and register for automatic graceful shutdown
+        channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+    }
+
+    @Test
+    public void testIfBasicAuthHeaderIsDeliveredToCdsServer() {
+        BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc
+            .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props)));
+        ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
+        bpProcessingSvcStub.process(new StreamObserver<ExecutionServiceOutput>() {
+            @Override
+            public void onNext(final ExecutionServiceOutput executionServiceOutput) {
+                // Test purpose only
+            }
+
+            @Override
+            public void onError(final Throwable throwable) {
+                // Test purpose only
+            }
+
+            @Override
+            public void onCompleted() {
+                // Test purpose only
+            }
+        });
+        verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(),
+            ArgumentMatchers.any());
+
+        Key<String> authHeader = Key
+            .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+        String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS)
+            .getBytes(StandardCharsets.UTF_8));
+        assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader));
+    }
+
+    private static class TestServerInterceptor implements ServerInterceptor {
+
+        @Override
+        public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall,
+            final Metadata metadata,
+            final ServerCallHandler<ReqT, RespT> serverCallHandler) {
+            return serverCallHandler.startCall(serverCall, metadata);
+        }
+    }
+}
+
+
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java
new file mode 100644 (file)
index 0000000..b9a9a84
--- /dev/null
@@ -0,0 +1,182 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class CdsProcessorGrpcClientTest {
+
+    // Generate a unique in-process server name.
+    private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+
+    // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+    @Rule
+    public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+    private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
+    private final CdsServerProperties props = new CdsServerProperties();
+    private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+    private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
+    private final List<String> messagesDelivered = new ArrayList<>();
+    private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+
+    private CdsProcessorGrpcClient client;
+
+    /**
+     * Setup the test.
+     *
+     * @throws IOException on failure to register the test grpc server for graceful shutdown
+     */
+    @Before
+    public void setUp() throws IOException {
+        // Setup the CDS properties
+        props.setHost(SERVER_NAME);
+        props.setPort(2000);
+        props.setUsername("testUser");
+        props.setPassword("testPassword");
+        props.setTimeout(60);
+
+        // Create a server, add service, start, and register for automatic graceful shutdown.
+        grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
+            .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+        // Create a client channel and register for automatic graceful shutdown
+        ManagedChannel channel = grpcCleanup
+            .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+
+        // Create an instance of the gRPC client
+        client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener));
+
+        // Implement the test gRPC server
+        BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
+            @Override
+            public StreamObserver<ExecutionServiceInput> process(
+                final StreamObserver<ExecutionServiceOutput> responseObserver) {
+                responseObserverRef.set(responseObserver);
+
+                return new StreamObserver<ExecutionServiceInput>() {
+                    @Override
+                    public void onNext(final ExecutionServiceInput executionServiceInput) {
+                        messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
+                    }
+
+                    @Override
+                    public void onError(final Throwable throwable) {
+                        // Test method
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                        allRequestsDelivered.countDown();
+                    }
+                };
+            }
+        };
+        serviceRegistry.addService(testCdsBlueprintServerImpl);
+    }
+
+    @After
+    public void tearDown() {
+        client.close();
+    }
+
+    @Test
+    public void testCdsProcessorGrpcClientConstructor() {
+        new CdsProcessorGrpcClient(listener, props);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCdsProcessorGrpcClientConstructorFailure() {
+        props.setHost(null);
+        new CdsProcessorGrpcClient(listener, props);
+    }
+
+    @Test
+    public void testSendRequestFail() throws InterruptedException {
+        // Setup
+        ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
+            .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
+            .build();
+
+        // Act
+        CountDownLatch finishLatch = client.sendRequest(testReq);
+        responseObserverRef.get().onError(new Throwable("failed to send testReq."));
+
+        verify(listener).onError(any(Throwable.class));
+        assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testSendRequestSuccess() throws InterruptedException {
+        // Setup request
+        ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
+            .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
+
+        // Act
+        final CountDownLatch finishLatch = client.sendRequest(testReq1);
+
+        // Assert that request message was sent and delivered once to the server
+        assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
+        assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
+
+        // Setup the server to send out two simple response messages and verify that the client receives them.
+        ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
+            .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
+        ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
+            .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
+        responseObserverRef.get().onNext(testResp1);
+        verify(listener).onMessage(testResp1);
+        responseObserverRef.get().onNext(testResp2);
+        verify(listener).onMessage(testResp2);
+
+        // let server complete.
+        responseObserverRef.get().onCompleted();
+        assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+    }
+}
index 3d86769..48ab79a 100644 (file)
@@ -44,7 +44,6 @@
       <module>sdc</module>
       <module>vfc</module>
       <module>sdnc</module>
+      <module>cds</module>
   </modules>
-
-
 </project>