244b89a6f571e28cd3b37ad8c5895cf818257011
[so.git] / common / src / main / java / org / onap / so / client / cds / CDSProcessingHandler.java
1 /*
2  * Copyright (C) 2019 Bell Canada.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.so.client.cds;
17
18 import io.grpc.ManagedChannel;
19 import io.grpc.stub.StreamObserver;
20 import java.util.concurrent.CountDownLatch;
21 import org.onap.ccsdk.apps.controllerblueprints.common.api.ActionIdentifiers;
22 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
23 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
24 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput;
25 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 class CDSProcessingHandler {
30
31     private static final Logger log = LoggerFactory.getLogger(CDSProcessingHandler.class);
32
33     private CDSProcessingListener listener;
34
35     CDSProcessingHandler(final CDSProcessingListener listener) {
36         this.listener = listener;
37     }
38
39     CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
40
41         ActionIdentifiers header = request.getActionIdentifiers();
42
43         log.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(), header.getBlueprintName(),
44             header.getBlueprintVersion());
45
46         final CountDownLatch finishLatch = new CountDownLatch(1);
47
48         final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
49
50         final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
51             @Override
52             public void onNext(ExecutionServiceOutput output) {
53                 listener.onMessage(output);
54             }
55
56             @Override
57             public void onError(Throwable t) {
58                 listener.onError(t);
59                 finishLatch.countDown();
60             }
61
62             @Override
63             public void onCompleted() {
64                 log.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
65                     header.getBlueprintName(), header.getBlueprintVersion());
66                 finishLatch.countDown();
67             }
68         };
69
70         final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
71
72         try {
73             // Send our message to CDS backend for processing
74             requestObserver.onNext(request);
75             // Mark the end of requests
76             requestObserver.onCompleted();
77         } catch (RuntimeException e) {
78             requestObserver.onError(e);
79         }
80         return finishLatch;
81     }
82 }