2 * ============LICENSE_START======================================================= Copyright (C) 2019 Nordix
3 * Foundation. ================================================================================ Licensed under the
4 * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
5 * obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
9 * either express or implied. See the License for the specific language governing permissions and limitations under the
12 * SPDX-License-Identifier: Apache-2.0 ============LICENSE_END=========================================================
17 import io.grpc.ServerBuilder;
18 import io.grpc.stub.StreamObserver;
19 import io.grpc.testing.GrpcCleanupRule;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.atomic.AtomicReference;
25 import javax.annotation.PostConstruct;
26 import org.junit.Rule;
27 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
28 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
29 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
30 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
31 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Value;
35 import org.springframework.stereotype.Component;
38 public class GrpcNettyServer extends BluePrintProcessingServiceImplBase {
40 private static final Logger logger = LoggerFactory.getLogger(GrpcNettyServer.class);
42 @Value("${cds.endpoint}")
49 public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
51 private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
52 private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
53 private final List<ExecutionServiceInput> detailedMessages = new ArrayList<>();
56 public void start() throws IOException {
58 final BluePrintProcessingServiceImplBase blueprintPrcessorImpl = new BluePrintProcessingServiceImplBase() {
60 public StreamObserver<ExecutionServiceInput> process(
61 StreamObserver<ExecutionServiceOutput> responseObserver) {
63 responseObserverRef.set(responseObserver);
65 StreamObserver<ExecutionServiceInput> requestObserver = new StreamObserver<ExecutionServiceInput>() {
67 public void onNext(ExecutionServiceInput message) {
68 detailedMessages.add(message);
69 logger.info("Message received: {}", message);
70 ExecutionServiceOutput executionServiceOutput = ExecutionServiceOutput.newBuilder()
71 .setActionIdentifiers(message.getActionIdentifiers())
72 .setStatus(Status.newBuilder().setEventType(EventType.EVENT_COMPONENT_EXECUTED).build())
75 responseObserverRef.get().onNext(executionServiceOutput);
76 logger.info("Message sent: {}", executionServiceOutput);
80 public void onError(Throwable t) {
81 responseObserverRef.get().onError(t);
85 public void onCompleted() {
86 allRequestsDelivered.countDown();
87 responseObserverRef.get().onCompleted();
91 return requestObserver;
94 grpcCleanup.register(ServerBuilder.forPort(Integer.valueOf(port)).directExecutor()
95 .addService(blueprintPrcessorImpl).build().start());
99 public List<ExecutionServiceInput> getDetailedMessages() {
100 return this.detailedMessages;