2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada.
4 * Modifications Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.cds.client;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.verify;
28 import io.grpc.ManagedChannel;
29 import io.grpc.inprocess.InProcessChannelBuilder;
30 import io.grpc.inprocess.InProcessServerBuilder;
31 import io.grpc.stub.StreamObserver;
32 import io.grpc.testing.GrpcCleanupRule;
33 import io.grpc.util.MutableHandlerRegistry;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Rule;
44 import org.junit.Test;
45 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
46 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
47 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
48 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
49 import org.onap.policy.cds.api.CdsProcessorListener;
50 import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
51 import org.onap.policy.cds.properties.CdsServerProperties;
53 public class CdsProcessorGrpcClientTest {
55 // Generate a unique in-process server name.
56 private static final String SERVER_NAME = InProcessServerBuilder.generateName();
58 // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
60 public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
62 private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
63 private final CdsServerProperties props = new CdsServerProperties();
64 private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
65 private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
66 private final List<String> messagesDelivered = new ArrayList<>();
67 private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
69 private CdsProcessorGrpcClient client;
74 * @throws IOException on failure to register the test grpc server for graceful shutdown
77 public void setUp() throws IOException {
78 // Setup the CDS properties
79 props.setHost(SERVER_NAME);
81 props.setUsername("testUser");
82 props.setPassword("testPassword");
85 // Create a server, add service, start, and register for automatic graceful shutdown.
86 grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
87 .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
89 // Create a client channel and register for automatic graceful shutdown
90 ManagedChannel channel = grpcCleanup
91 .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
93 // Create an instance of the gRPC client
94 client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener));
96 // Implement the test gRPC server
97 BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
99 public StreamObserver<ExecutionServiceInput> process(
100 final StreamObserver<ExecutionServiceOutput> responseObserver) {
101 responseObserverRef.set(responseObserver);
103 return new StreamObserver<ExecutionServiceInput>() {
105 public void onNext(final ExecutionServiceInput executionServiceInput) {
106 messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
110 public void onError(final Throwable throwable) {
115 public void onCompleted() {
116 allRequestsDelivered.countDown();
121 serviceRegistry.addService(testCdsBlueprintServerImpl);
125 public void tearDown() {
130 public void testCdsProcessorGrpcClientConstructor() {
131 new CdsProcessorGrpcClient(listener, props).close();
134 @Test(expected = IllegalStateException.class)
135 public void testCdsProcessorGrpcClientConstructorFailure() {
137 new CdsProcessorGrpcClient(listener, props).close();
141 public void testSendRequestFail() throws InterruptedException {
143 ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
144 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
148 CountDownLatch finishLatch = client.sendRequest(testReq);
149 responseObserverRef.get().onError(new Throwable("failed to send testReq."));
151 verify(listener).onError(any(Throwable.class));
152 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
156 public void testSendRequestSuccess() throws InterruptedException {
158 ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
159 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
162 final CountDownLatch finishLatch = client.sendRequest(testReq1);
164 // Assert that request message was sent and delivered once to the server
165 assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
166 assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
168 // Setup the server to send out two simple response messages and verify that the client receives them.
169 ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
170 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
171 ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
172 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
173 responseObserverRef.get().onNext(testResp1);
174 verify(listener).onMessage(testResp1);
175 responseObserverRef.get().onNext(testResp2);
176 verify(listener).onMessage(testResp2);
178 // let server complete.
179 responseObserverRef.get().onCompleted();
180 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));