bf4a015e3691dd3133cb3a254a3d445fd5d90c0a
[so.git] / bpmn / so-bpmn-infrastructure-flows / src / test / java / org / onap / so / GrpcNettyServer.java
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019 Nordix Foundation.
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  *
16  *  SPDX-License-Identifier: Apache-2.0
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.onap.so;
21
22 import io.grpc.ServerBuilder;
23 import io.grpc.stub.StreamObserver;
24 import io.grpc.testing.GrpcCleanupRule;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.atomic.AtomicReference;
30 import javax.annotation.PostConstruct;
31 import org.junit.Rule;
32 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Value;
40 import org.springframework.stereotype.Component;
41
42 @Component
43 public class GrpcNettyServer extends BluePrintProcessingServiceImplBase {
44
45     private static final Logger logger = LoggerFactory.getLogger(GrpcNettyServer.class);
46
47     @Value("${cds.endpoint}")
48     private String host;
49
50     @Value("${cds.port}")
51     private String port;
52
53     @Rule
54     public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
55
56     private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
57     private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
58     private final List<ExecutionServiceInput> detailedMessages = new ArrayList<>();
59
60     @PostConstruct
61     public void start() throws IOException {
62
63         final BluePrintProcessingServiceImplBase blueprintPrcessorImpl =
64             new BluePrintProcessingServiceImplBase() {
65                 @Override
66                 public StreamObserver<ExecutionServiceInput> process(
67                     StreamObserver<ExecutionServiceOutput> responseObserver) {
68
69                     responseObserverRef.set(responseObserver);
70
71                     StreamObserver<ExecutionServiceInput> requestObserver = new StreamObserver<ExecutionServiceInput>() {
72                         @Override
73                         public void onNext(ExecutionServiceInput message) {
74                             detailedMessages.add(message);
75                             logger.info("Message received: {}", message);
76                             ExecutionServiceOutput executionServiceOutput = ExecutionServiceOutput.newBuilder()
77                                 .setActionIdentifiers(message.getActionIdentifiers())
78                                 .setStatus(Status.newBuilder().setEventType(
79                                     EventType.EVENT_COMPONENT_EXECUTED).build()).build();
80
81                             responseObserverRef.get().onNext(executionServiceOutput);
82                             logger.info("Message sent: {}", executionServiceOutput);
83                         }
84
85                         @Override
86                         public void onError(Throwable t) {
87                             responseObserverRef.get().onError(t);
88                         }
89
90                         @Override
91                         public void onCompleted() {
92                             allRequestsDelivered.countDown();
93                             responseObserverRef.get().onCompleted();
94                         }
95                     };
96
97                     return requestObserver;
98                 }
99             };
100         grpcCleanup.register(
101             ServerBuilder.forPort(Integer.valueOf(port)).directExecutor().addService(blueprintPrcessorImpl).build()
102                 .start());
103
104     }
105
106     public List<ExecutionServiceInput> getDetailedMessages(){
107         return this.detailedMessages;
108     }
109
110 }