2 * Copyright (C) 2019 Bell Canada.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.so.client.cds;
18 import io.grpc.ManagedChannel;
19 import io.grpc.stub.StreamObserver;
20 import java.util.concurrent.CountDownLatch;
21 import org.onap.ccsdk.apps.controllerblueprints.common.api.ActionIdentifiers;
22 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
23 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
24 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput;
25 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 class CDSProcessingHandler {
31 private static final Logger log = LoggerFactory.getLogger(CDSProcessingHandler.class);
33 private CDSProcessingListener listener;
35 CDSProcessingHandler(final CDSProcessingListener listener) {
36 this.listener = listener;
39 CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
41 ActionIdentifiers header = request.getActionIdentifiers();
43 log.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(), header.getBlueprintName(),
44 header.getBlueprintVersion());
46 final CountDownLatch finishLatch = new CountDownLatch(1);
48 final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
50 final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
52 public void onNext(ExecutionServiceOutput output) {
53 listener.onMessage(output);
57 public void onError(Throwable t) {
59 finishLatch.countDown();
63 public void onCompleted() {
64 log.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
65 header.getBlueprintName(), header.getBlueprintVersion());
66 finishLatch.countDown();
70 final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
73 // Send our message to CDS backend for processing
74 requestObserver.onNext(request);
75 // Mark the end of requests
76 requestObserver.onCompleted();
77 } catch (RuntimeException e) {
78 requestObserver.onError(e);