0922fc4cb4691be6ceec84b1efade1465bef0251
[policy/models.git] / models-interactions / model-impl / cds / src / test / java / org / onap / policy / cds / client / CdsProcessorGrpcClientTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada.
4  * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.cds.client;
21
22 import static org.assertj.core.api.Assertions.assertThatCode;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertTrue;
25 import static org.mockito.ArgumentMatchers.any;
26 import static org.mockito.Mockito.spy;
27 import static org.mockito.Mockito.verify;
28
29 import io.grpc.ManagedChannel;
30 import io.grpc.inprocess.InProcessChannelBuilder;
31 import io.grpc.inprocess.InProcessServerBuilder;
32 import io.grpc.stub.StreamObserver;
33 import io.grpc.testing.GrpcCleanupRule;
34 import io.grpc.util.MutableHandlerRegistry;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Collections;
38 import java.util.List;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Rule;
45 import org.junit.Test;
46 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
47 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
48 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
49 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
50 import org.onap.policy.cds.api.CdsProcessorListener;
51 import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
52 import org.onap.policy.cds.properties.CdsServerProperties;
53
54 public class CdsProcessorGrpcClientTest {
55
56     // Generate a unique in-process server name.
57     private static final String SERVER_NAME = InProcessServerBuilder.generateName();
58
59     // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
60     @Rule
61     public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
62
63     private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
64     private final CdsServerProperties props = new CdsServerProperties();
65     private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
66     private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
67     private final List<String> messagesDelivered = new ArrayList<>();
68     private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
69
70     private CdsProcessorGrpcClient client;
71
72     /**
73      * Setup the test.
74      *
75      * @throws IOException on failure to register the test grpc server for graceful shutdown
76      */
77     @Before
78     public void setUp() throws IOException {
79         // Setup the CDS properties
80         props.setHost(SERVER_NAME);
81         props.setPort(2000);
82         props.setUsername("testUser");
83         props.setPassword("testPassword");
84         props.setTimeout(60);
85
86         // Create a server, add service, start, and register for automatic graceful shutdown.
87         grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
88             .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
89
90         // Create a client channel and register for automatic graceful shutdown
91         ManagedChannel channel = grpcCleanup
92             .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
93
94         // Create an instance of the gRPC client
95         client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener, "gRPC://localhost:1234/"));
96
97         // Implement the test gRPC server
98         BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
99             @Override
100             public StreamObserver<ExecutionServiceInput> process(
101                 final StreamObserver<ExecutionServiceOutput> responseObserver) {
102                 responseObserverRef.set(responseObserver);
103
104                 return new StreamObserver<ExecutionServiceInput>() {
105                     @Override
106                     public void onNext(final ExecutionServiceInput executionServiceInput) {
107                         messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
108                     }
109
110                     @Override
111                     public void onError(final Throwable throwable) {
112                         // Test method
113                     }
114
115                     @Override
116                     public void onCompleted() {
117                         allRequestsDelivered.countDown();
118                     }
119                 };
120             }
121         };
122         serviceRegistry.addService(testCdsBlueprintServerImpl);
123     }
124
125     @After
126     public void tearDown() {
127         client.close();
128     }
129
130     @Test
131     public void testCdsProcessorGrpcClientConstructor() {
132         assertThatCode(() -> new CdsProcessorGrpcClient(listener, props).close()).doesNotThrowAnyException();
133     }
134
135     @Test(expected = IllegalStateException.class)
136     public void testCdsProcessorGrpcClientConstructorFailure() {
137         props.setHost(null);
138         new CdsProcessorGrpcClient(listener, props).close();
139     }
140
141     @Test
142     public void testSendRequestFail() throws InterruptedException {
143         // Setup
144         ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
145             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
146             .build();
147
148         // Act
149         CountDownLatch finishLatch = client.sendRequest(testReq);
150         responseObserverRef.get().onError(new Throwable("failed to send testReq."));
151
152         verify(listener).onError(any(Throwable.class));
153         assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
154     }
155
156     @Test
157     public void testSendRequestSuccess() throws InterruptedException {
158         // Setup request
159         ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
160             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
161
162         // Act
163         final CountDownLatch finishLatch = client.sendRequest(testReq1);
164
165         // Assert that request message was sent and delivered once to the server
166         assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
167         assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
168
169         // Setup the server to send out two simple response messages and verify that the client receives them.
170         ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
171             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
172         ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
173             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
174         responseObserverRef.get().onNext(testResp1);
175         verify(listener).onMessage(testResp1);
176         responseObserverRef.get().onNext(testResp2);
177         verify(listener).onMessage(testResp2);
178
179         // let server complete.
180         responseObserverRef.get().onCompleted();
181         assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
182     }
183 }