2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Nordix
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 * SPDX-License-Identifier: Apache-2.0
17 * ============LICENSE_END=========================================================
22 import io.grpc.ServerBuilder;
23 import io.grpc.stub.StreamObserver;
24 import io.grpc.testing.GrpcCleanupRule;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.atomic.AtomicReference;
30 import javax.annotation.PostConstruct;
31 import org.junit.Rule;
32 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Value;
40 import org.springframework.stereotype.Component;
43 public class GrpcNettyServer extends BluePrintProcessingServiceImplBase {
45 private static final Logger logger = LoggerFactory.getLogger(GrpcNettyServer.class);
47 @Value("${cds.endpoint}")
54 public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
56 private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
57 private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
58 private final List<ExecutionServiceInput> detailedMessages = new ArrayList<>();
61 public void start() throws IOException {
63 final BluePrintProcessingServiceImplBase blueprintPrcessorImpl = new BluePrintProcessingServiceImplBase() {
65 public StreamObserver<ExecutionServiceInput> process(
66 StreamObserver<ExecutionServiceOutput> responseObserver) {
68 responseObserverRef.set(responseObserver);
70 StreamObserver<ExecutionServiceInput> requestObserver = new StreamObserver<ExecutionServiceInput>() {
72 public void onNext(ExecutionServiceInput message) {
73 detailedMessages.add(message);
74 logger.info("Message received: {}", message);
75 ExecutionServiceOutput executionServiceOutput = ExecutionServiceOutput.newBuilder()
76 .setActionIdentifiers(message.getActionIdentifiers())
77 .setStatus(Status.newBuilder().setEventType(EventType.EVENT_COMPONENT_EXECUTED).build())
80 responseObserverRef.get().onNext(executionServiceOutput);
81 logger.info("Message sent: {}", executionServiceOutput);
85 public void onError(Throwable t) {
86 responseObserverRef.get().onError(t);
90 public void onCompleted() {
91 allRequestsDelivered.countDown();
92 responseObserverRef.get().onCompleted();
96 return requestObserver;
99 grpcCleanup.register(ServerBuilder.forPort(Integer.valueOf(port)).directExecutor()
100 .addService(blueprintPrcessorImpl).build().start());
104 public List<ExecutionServiceInput> getDetailedMessages() {
105 return this.detailedMessages;
108 public void cleanMessage() {
109 this.detailedMessages.clear();