2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 * SPDX-License-Identifier: Apache-2.0
17 * ============LICENSE_END=========================================================
22 import io.grpc.ServerBuilder;
23 import io.grpc.stub.StreamObserver;
24 import io.grpc.testing.GrpcCleanupRule;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.atomic.AtomicReference;
31 import javax.annotation.PostConstruct;
32 import org.junit.Rule;
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Value;
41 import org.springframework.stereotype.Component;
44 public class GrpcNettyServer extends BluePrintProcessingServiceImplBase {
46 private static final Logger logger = LoggerFactory.getLogger(GrpcNettyServer.class);
48 @Value("${cds.endpoint}")
55 public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
57 private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
58 private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
59 private final List<ExecutionServiceInput> detailedMessages = new ArrayList<>();
62 public void start() throws IOException {
64 final BluePrintProcessingServiceImplBase blueprintPrcessorImpl = new BluePrintProcessingServiceImplBase() {
66 public StreamObserver<ExecutionServiceInput> process(
67 StreamObserver<ExecutionServiceOutput> responseObserver) {
69 responseObserverRef.set(responseObserver);
71 StreamObserver<ExecutionServiceInput> requestObserver = new StreamObserver<ExecutionServiceInput>() {
73 public void onNext(ExecutionServiceInput message) {
74 detailedMessages.add(message);
75 logger.info("Message received: {}", message);
76 ExecutionServiceOutput executionServiceOutput = ExecutionServiceOutput.newBuilder()
77 .setActionIdentifiers(message.getActionIdentifiers())
78 .setStatus(Status.newBuilder().setEventType(EventType.EVENT_COMPONENT_EXECUTED).build())
81 responseObserverRef.get().onNext(executionServiceOutput);
82 logger.info("Message sent: {}", executionServiceOutput);
86 public void onError(Throwable t) {
87 responseObserverRef.get().onError(t);
91 public void onCompleted() {
92 allRequestsDelivered.countDown();
93 responseObserverRef.get().onCompleted();
97 return requestObserver;
100 grpcCleanup.register(ServerBuilder.forPort(Integer.valueOf(port)).directExecutor()
101 .addService(blueprintPrcessorImpl).build().start());
105 public List<ExecutionServiceInput> getDetailedMessages() {
106 return this.detailedMessages;
109 public void cleanMessage() {
110 this.detailedMessages.clear();