35c5fe1c14b52d652158a596cc5947efd6050fbd
[so.git] / bpmn / so-bpmn-infrastructure-flows / src / test / java / org / onap / so / GrpcNettyServer.java
1 /*
2  * ============LICENSE_START======================================================= Copyright (C) 2019 Nordix
3  * Foundation. ================================================================================ Licensed under the
4  * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
5  * obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
9  * either express or implied. See the License for the specific language governing permissions and limitations under the
10  * License.
11  *
12  * SPDX-License-Identifier: Apache-2.0 ============LICENSE_END=========================================================
13  */
14
15 package org.onap.so;
16
17 import io.grpc.ServerBuilder;
18 import io.grpc.stub.StreamObserver;
19 import io.grpc.testing.GrpcCleanupRule;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.PostConstruct;
26 import org.junit.Rule;
27 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
28 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
29 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BlueprintProcessingServiceGrpc.BlueprintProcessingServiceImplBase;
30 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
31 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Value;
35 import org.springframework.stereotype.Component;
36
37 @Component
38 public class GrpcNettyServer extends BlueprintProcessingServiceImplBase {
39
40     private static final Logger logger = LoggerFactory.getLogger(GrpcNettyServer.class);
41
42     @Value("${cds.endpoint}")
43     private String host;
44
45     @Value("${cds.port}")
46     private String port;
47
48     @Rule
49     public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
50
51     private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
52     private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
53     private final List<ExecutionServiceInput> detailedMessages = new ArrayList<>();
54
55     @PostConstruct
56     public void start() throws IOException {
57
58         final BlueprintProcessingServiceImplBase blueprintPrcessorImpl = new BlueprintProcessingServiceImplBase() {
59             @Override
60             public StreamObserver<ExecutionServiceInput> process(
61                     StreamObserver<ExecutionServiceOutput> responseObserver) {
62
63                 responseObserverRef.set(responseObserver);
64
65                 StreamObserver<ExecutionServiceInput> requestObserver = new StreamObserver<ExecutionServiceInput>() {
66                     @Override
67                     public void onNext(ExecutionServiceInput message) {
68                         detailedMessages.add(message);
69                         logger.info("Message received: {}", message);
70                         ExecutionServiceOutput executionServiceOutput = ExecutionServiceOutput.newBuilder()
71                                 .setActionIdentifiers(message.getActionIdentifiers())
72                                 .setStatus(Status.newBuilder().setEventType(EventType.EVENT_COMPONENT_EXECUTED).build())
73                                 .build();
74
75                         responseObserverRef.get().onNext(executionServiceOutput);
76                         logger.info("Message sent: {}", executionServiceOutput);
77                     }
78
79                     @Override
80                     public void onError(Throwable t) {
81                         responseObserverRef.get().onError(t);
82                     }
83
84                     @Override
85                     public void onCompleted() {
86                         allRequestsDelivered.countDown();
87                         responseObserverRef.get().onCompleted();
88                     }
89                 };
90
91                 return requestObserver;
92             }
93         };
94         grpcCleanup.register(ServerBuilder.forPort(Integer.valueOf(port)).directExecutor()
95                 .addService(blueprintPrcessorImpl).build().start());
96
97     }
98
99     public List<ExecutionServiceInput> getDetailedMessages() {
100         return this.detailedMessages;
101     }
102
103     public void resetList() {
104         detailedMessages.clear();
105     }
106
107 }