Updating pdp-simulator as per recent changes to send PdpGroup in status
[policy/models.git] / models-interactions / model-impl / cds / src / main / java / org / onap / policy / cds / client / CdsProcessorHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.cds.client;
20
21 import io.grpc.ManagedChannel;
22 import io.grpc.stub.StreamObserver;
23 import java.util.concurrent.CountDownLatch;
24 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
25 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
26 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
27 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
28 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
29 import org.onap.policy.cds.api.CdsProcessorListener;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public class CdsProcessorHandler {
34
35     private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class);
36
37     private CdsProcessorListener listener;
38
39     CdsProcessorHandler(final CdsProcessorListener listener) {
40         this.listener = listener;
41     }
42
43     CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
44         final ActionIdentifiers header = request.getActionIdentifiers();
45         LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(),
46             header.getBlueprintName(), header.getBlueprintVersion());
47
48         final CountDownLatch finishLatch = new CountDownLatch(1);
49         final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
50         final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
51             @Override
52             public void onNext(ExecutionServiceOutput output) {
53                 listener.onMessage(output);
54             }
55
56             @Override
57             public void onError(Throwable throwable) {
58                 listener.onError(throwable);
59                 finishLatch.countDown();
60             }
61
62             @Override
63             public void onCompleted() {
64                 LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
65                     header.getBlueprintName(), header.getBlueprintVersion());
66                 finishLatch.countDown();
67             }
68         };
69
70         final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
71         try {
72             // Send the message to CDS backend for processing
73             requestObserver.onNext(request);
74             // Mark the end of requests
75             requestObserver.onCompleted();
76         } catch (RuntimeException e) {
77             requestObserver.onError(e);
78         }
79         return finishLatch;
80     }
81 }