1a35064cab78d6e021f070a3b8ab11b18adee984
[policy/models.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada.
4  * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
5  * Modifications Copyright (C) 2024 Nordix Foundation
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.cds.client;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.jupiter.api.Assertions.assertEquals;
25 import static org.junit.jupiter.api.Assertions.assertThrows;
26 import static org.junit.jupiter.api.Assertions.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.verify;
30
31 import io.grpc.ManagedChannel;
32 import io.grpc.inprocess.InProcessChannelBuilder;
33 import io.grpc.inprocess.InProcessServerBuilder;
34 import io.grpc.stub.StreamObserver;
35 import io.grpc.util.MutableHandlerRegistry;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.jupiter.api.AfterEach;
44 import org.junit.jupiter.api.BeforeEach;
45 import org.junit.jupiter.api.Test;
46 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
47 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
48 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
49 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
50 import org.onap.policy.cds.api.CdsProcessorListener;
51 import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
52 import org.onap.policy.cds.properties.CdsServerProperties;
53
54 class CdsProcessorGrpcClientTest {
55
56     private CdsProcessorListener listener;
57     private CdsServerProperties props;
58     private MutableHandlerRegistry serviceRegistry;
59     private AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef;
60     private List<String> messagesDelivered;
61     private CountDownLatch allRequestsDelivered;
62
63     private ManagedChannel channel;
64     private CdsProcessorGrpcClient client;
65
66     /**
67      * Setup the test.
68      *
69      * @throws IOException on failure to register the test grpc server for graceful shutdown
70      */
71     @BeforeEach
72     void setUp() throws IOException {
73
74         listener = spy(new TestCdsProcessorListenerImpl());
75         props = new CdsServerProperties();
76         serviceRegistry = new MutableHandlerRegistry();
77         responseObserverRef = new AtomicReference<>();
78         messagesDelivered = new ArrayList<>();
79         allRequestsDelivered = new CountDownLatch(1);
80
81         // Setup the CDS properties
82         // Generate a unique in-process server name.
83         String serverName = InProcessServerBuilder.generateName();
84         props.setHost(serverName);
85         props.setPort(2000);
86         props.setUsername("testUser");
87         props.setPassword("testPassword");
88         props.setTimeout(60);
89
90         // Create a server, add service, start, and register for automatic graceful shutdown.
91         InProcessServerBuilder.forName(serverName)
92             .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start();
93
94         // Create a client channel
95         channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
96
97         // Create an instance of the gRPC client
98         client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener, "gRPC://localhost:1234/"));
99
100         // Implement the test gRPC server
101         BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
102             @Override
103             public StreamObserver<ExecutionServiceInput> process(
104                 final StreamObserver<ExecutionServiceOutput> responseObserver) {
105                 responseObserverRef.set(responseObserver);
106
107                 return new StreamObserver<ExecutionServiceInput>() {
108                     @Override
109                     public void onNext(final ExecutionServiceInput executionServiceInput) {
110                         messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
111                     }
112
113                     @Override
114                     public void onError(final Throwable throwable) {
115                         // Test method
116                     }
117
118                     @Override
119                     public void onCompleted() {
120                         allRequestsDelivered.countDown();
121                     }
122                 };
123             }
124         };
125         serviceRegistry.addService(testCdsBlueprintServerImpl);
126     }
127
128     /**
129      * Cleans up resources after each test execution.
130      * This method ensures that the gRPC client and channel are properly closed and released after each test.
131      * It is annotated with {@code @AfterEach} to automatically run after each test method in the class.
132      * If the {@code client} is not {@code null}, it calls the {@code close} method to release resources
133      *     used by the client.
134      * If the {@code channel} is not {@code null}, it calls the {@code shutdownNow} method
135      *     to forcefully close the channel.
136      */
137     @AfterEach
138     void tearDown() {
139         if (client != null) {
140             client.close();
141         }
142         if (channel != null) {
143             channel.shutdownNow();
144         }
145     }
146
147     @Test
148     void testCdsProcessorGrpcClientConstructor() {
149         assertThatCode(() -> new CdsProcessorGrpcClient(listener, props).close()).doesNotThrowAnyException();
150     }
151
152     @Test
153     void testCdsProcessorGrpcClientConstructorFailure() {
154         props.setHost(null);
155         assertThrows(IllegalStateException.class, () -> {
156             new CdsProcessorGrpcClient(listener, props).close();
157         });
158     }
159
160     @Test
161     void testSendRequestFail() throws InterruptedException {
162         // Setup
163         ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
164             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
165             .build();
166
167         // Act
168         CountDownLatch finishLatch = client.sendRequest(testReq);
169         responseObserverRef.get().onError(new Throwable("failed to send testReq."));
170
171         verify(listener).onError(any(Throwable.class));
172         assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
173     }
174
175     @Test
176     void testSendRequestSuccess() throws InterruptedException {
177         // Setup request
178         ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
179             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
180
181         // Act
182         final CountDownLatch finishLatch = client.sendRequest(testReq1);
183
184         // Assert that request message was sent and delivered once to the server
185         assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
186         assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
187
188         // Setup the server to send out two simple response messages and verify that the client receives them.
189         ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
190             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
191         ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
192             .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
193         responseObserverRef.get().onNext(testResp1);
194         verify(listener).onMessage(testResp1);
195         responseObserverRef.get().onNext(testResp2);
196         verify(listener).onMessage(testResp2);
197
198         // let server complete.
199         responseObserverRef.get().onCompleted();
200         assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
201     }
202 }