2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada.
4 * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 * Modifications Copyright (C) 2024 Nordix Foundation
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.cds.client;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertThrows;
26 import static org.junit.jupiter.api.Assertions.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
31 import io.grpc.ManagedChannel;
32 import io.grpc.inprocess.InProcessChannelBuilder;
33 import io.grpc.inprocess.InProcessServerBuilder;
34 import io.grpc.stub.StreamObserver;
35 import io.grpc.util.MutableHandlerRegistry;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.jupiter.api.AfterEach;
44 import org.junit.jupiter.api.BeforeEach;
45 import org.junit.jupiter.api.Test;
46 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
47 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
48 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
49 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
50 import org.onap.policy.cds.api.CdsProcessorListener;
51 import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
52 import org.onap.policy.cds.properties.CdsServerProperties;
54 class CdsProcessorGrpcClientTest {
56 private CdsProcessorListener listener;
57 private CdsServerProperties props;
58 private MutableHandlerRegistry serviceRegistry;
59 private AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef;
60 private List<String> messagesDelivered;
61 private CountDownLatch allRequestsDelivered;
63 private ManagedChannel channel;
64 private CdsProcessorGrpcClient client;
69 * @throws IOException on failure to register the test grpc server for graceful shutdown
72 void setUp() throws IOException {
74 listener = spy(new TestCdsProcessorListenerImpl());
75 props = new CdsServerProperties();
76 serviceRegistry = new MutableHandlerRegistry();
77 responseObserverRef = new AtomicReference<>();
78 messagesDelivered = new ArrayList<>();
79 allRequestsDelivered = new CountDownLatch(1);
81 // Setup the CDS properties
82 // Generate a unique in-process server name.
83 String serverName = InProcessServerBuilder.generateName();
84 props.setHost(serverName);
86 props.setUsername("testUser");
87 props.setPassword("testPassword");
90 // Create a server, add service, start, and register for automatic graceful shutdown.
91 InProcessServerBuilder.forName(serverName)
92 .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start();
94 // Create a client channel
95 channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
97 // Create an instance of the gRPC client
98 client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener, "gRPC://localhost:1234/"));
100 // Implement the test gRPC server
101 BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
103 public StreamObserver<ExecutionServiceInput> process(
104 final StreamObserver<ExecutionServiceOutput> responseObserver) {
105 responseObserverRef.set(responseObserver);
107 return new StreamObserver<ExecutionServiceInput>() {
109 public void onNext(final ExecutionServiceInput executionServiceInput) {
110 messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
114 public void onError(final Throwable throwable) {
119 public void onCompleted() {
120 allRequestsDelivered.countDown();
125 serviceRegistry.addService(testCdsBlueprintServerImpl);
129 * Cleans up resources after each test execution.
130 * This method ensures that the gRPC client and channel are properly closed and released after each test.
131 * It is annotated with {@code @AfterEach} to automatically run after each test method in the class.
132 * If the {@code client} is not {@code null}, it calls the {@code close} method to release resources
133 * used by the client.
134 * If the {@code channel} is not {@code null}, it calls the {@code shutdownNow} method
135 * to forcefully close the channel.
139 if (client != null) {
142 if (channel != null) {
143 channel.shutdownNow();
148 void testCdsProcessorGrpcClientConstructor() {
149 assertThatCode(() -> new CdsProcessorGrpcClient(listener, props).close()).doesNotThrowAnyException();
153 void testCdsProcessorGrpcClientConstructorFailure() {
155 assertThrows(IllegalStateException.class, () -> {
156 new CdsProcessorGrpcClient(listener, props).close();
161 void testSendRequestFail() throws InterruptedException {
163 ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
164 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
168 CountDownLatch finishLatch = client.sendRequest(testReq);
169 responseObserverRef.get().onError(new Throwable("failed to send testReq."));
171 verify(listener).onError(any(Throwable.class));
172 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
176 void testSendRequestSuccess() throws InterruptedException {
178 ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
179 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
182 final CountDownLatch finishLatch = client.sendRequest(testReq1);
184 // Assert that request message was sent and delivered once to the server
185 assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
186 assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
188 // Setup the server to send out two simple response messages and verify that the client receives them.
189 ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
190 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
191 ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
192 .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
193 responseObserverRef.get().onNext(testResp1);
194 verify(listener).onMessage(testResp1);
195 responseObserverRef.get().onNext(testResp2);
196 verify(listener).onMessage(testResp2);
198 // let server complete.
199 responseObserverRef.get().onCompleted();
200 assertTrue(finishLatch.await(0, TimeUnit.SECONDS));