2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada.
4 * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.cds.client;
22 import static org.assertj.core.api.Assertions.assertThatCode;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.verify;
29 import io.grpc.ManagedChannel;
30 import io.grpc.inprocess.InProcessChannelBuilder;
31 import io.grpc.inprocess.InProcessServerBuilder;
32 import io.grpc.stub.StreamObserver;
33 import io.grpc.testing.GrpcCleanupRule;
34 import io.grpc.util.MutableHandlerRegistry;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.List;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Rule;
45 import org.junit.Test;
46 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
47 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BlueprintProcessingServiceGrpc.BlueprintProcessingServiceImplBase;
48 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
49 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
50 import org.onap.policy.cds.api.CdsProcessorListener;
51 import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
52 import org.onap.policy.cds.properties.CdsServerProperties;
54 public class CdsProcessorGrpcClientTest {
56 // Generate a unique in-process server name.
57 private static final String SERVER_NAME = InProcessServerBuilder.generateName();
59 // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
61 public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
63 private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
64 private final CdsServerProperties props = new CdsServerProperties();
65 private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
66 private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
67 private final List<String> messagesDelivered = new ArrayList<>();
68 private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
70 private CdsProcessorGrpcClient client;
75 * @throws IOException on failure to register the test grpc server for graceful shutdown
78 public void setUp() throws IOException {
79 // Setup the CDS properties
80 props.setHost(SERVER_NAME);
82 props.setUsername("testUser");
83 props.setPassword("testPassword");
86 // Create a server, add service, start, and register for automatic graceful shutdown.
87 grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
88 .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
90 // Create a client channel and register for automatic graceful shutdown
91 ManagedChannel channel = grpcCleanup
92 .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
94 // Create an instance of the gRPC client
95 client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener, "gRPC://localhost:1234/"));
97 // Implement the test gRPC server
98 BlueprintProcessingServiceImplBase testCdsBlueprintServerImpl = new BlueprintProcessingServiceImplBase() {
100 public StreamObserver<ExecutionServiceInput> process(
101 final StreamObserver<ExecutionServiceOutput> responseObserver) {
102 responseObserverRef.set(responseObserver);
104 return new StreamObserver<ExecutionServiceInput>() {
106 public void onNext(final ExecutionServiceInput executionServiceInput) {
107 messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
111 public void onError(final Throwable throwable) {
116 public void onCompleted() {
117 allRequestsDelivered.countDown();
122 serviceRegistry.addService(testCdsBlueprintServerImpl);
126 public void tearDown() {
131 public void testCdsProcessorGrpcClientConstructor() {
132 assertThatCode(() -> new CdsProcessorGrpcClient(listener, props).close()).doesNotThrowAnyException();
135 @Test(expected = IllegalStateException.class)
136 public void testCdsProcessorGrpcClientConstructorFailure() {
138 new CdsProcessorGrpcClient(listener, props).close();
142 public void testSendRequestFail() throws InterruptedException {
144 ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
145 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
149 CountDownLatch finishLatch = client.sendRequest(testReq);
150 responseObserverRef.get().onError(new Throwable("failed to send testReq."));
152 verify(listener).onError(any(Throwable.class));
153 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
157 public void testSendRequestSuccess() throws InterruptedException {
159 ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
160 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
163 final CountDownLatch finishLatch = client.sendRequest(testReq1);
165 // Assert that request message was sent and delivered once to the server
166 assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
167 assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
169 // Setup the server to send out two simple response messages and verify that the client receives them.
170 ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
171 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
172 ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
173 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
174 responseObserverRef.get().onNext(testResp1);
175 verify(listener).onMessage(testResp1);
176 responseObserverRef.get().onNext(testResp2);
177 verify(listener).onMessage(testResp2);
179 // let server complete.
180 responseObserverRef.get().onCompleted();
181 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));