2 * Copyright (C) 2019 Bell Canada.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onap.ccsdk.sli.adaptors.grpc.cds;
18 import com.google.protobuf.InvalidProtocolBufferException;
19 import com.google.protobuf.Struct;
20 import io.grpc.ManagedChannel;
21 import io.grpc.Status;
22 import io.grpc.stub.StreamObserver;
24 import java.util.UUID;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28 import org.onap.ccsdk.apps.controllerblueprints.common.api.ActionIdentifiers;
29 import org.onap.ccsdk.apps.controllerblueprints.common.api.CommonHeader;
30 import org.onap.ccsdk.apps.controllerblueprints.common.api.Flag;
31 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
32 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
33 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput;
34 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput;
35 import org.onap.ccsdk.sli.adaptors.grpc.Utils;
36 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
37 import org.onap.ccsdk.sli.core.sli.SvcLogicResource.QueryStatus;
38 import org.onap.ccsdk.sli.core.slipluginutils.SliPluginUtils;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 public class BlueprintProcessingHandler {
44 private static final Logger log = LoggerFactory.getLogger(BlueprintProcessingHandler.class);
46 private static final String CCSDK_ORIGINATOR = "CCSDK";
47 private static final String IS_FORCE_PROP = "is_force";
48 private static final String TTL_PROP = "ttl";
49 private static final String BLUEPRINT_NAME_PROP = "blueprint_name";
50 private static final String BLUEPRINT_VERSION_PROP = "blueprint_version";
51 private static final String ACTION_PROP = "action";
52 private static final String MODE_PROP = "mode";
53 private static final String PAYLOAD_PROP = "payload";
55 QueryStatus process(final Map<String, String> parameters, final ManagedChannel channel) {
57 SliPluginUtils.checkParameters(parameters,
58 new String[]{BLUEPRINT_NAME_PROP, BLUEPRINT_VERSION_PROP, ACTION_PROP, MODE_PROP}, log);
59 } catch (SvcLogicException e) {
60 return QueryStatus.FAILURE;
63 final boolean isForce = Boolean.getBoolean(parameters.get(IS_FORCE_PROP));
64 final int ttl = Integer.parseInt(parameters.get(TTL_PROP));
65 final String blueprintName = parameters.get(BLUEPRINT_NAME_PROP);
66 final String blueprintVersion = parameters.get(BLUEPRINT_VERSION_PROP);
67 final String action = parameters.get(ACTION_PROP);
68 final String mode = parameters.get(MODE_PROP);
69 final String payload = parameters.get(PAYLOAD_PROP);
71 log.info("Processing blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
73 final AtomicReference<QueryStatus> responseStatus = new AtomicReference<>();
74 final CountDownLatch finishLatch = new CountDownLatch(1);
76 final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
78 final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
80 public void onNext(ExecutionServiceOutput output) {
81 log.info("onNext: {}", output);
85 public void onError(Throwable t) {
86 Status status = Status.fromThrowable(t);
87 log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName,
89 responseStatus.compareAndSet(null, QueryStatus.FAILURE);
90 finishLatch.countDown();
94 public void onCompleted() {
95 log.info("Completed blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
96 responseStatus.compareAndSet(null, QueryStatus.SUCCESS);
97 finishLatch.countDown();
101 final CommonHeader commonHeader = CommonHeader.newBuilder()
102 .setOriginatorId(CCSDK_ORIGINATOR)
103 .setRequestId(UUID.randomUUID().toString())
104 .setTimestamp(Utils.timestamp())
105 .setFlag(Flag.newBuilder()
110 final ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
111 .setActionName(action)
112 .setBlueprintName(blueprintName)
113 .setBlueprintVersion(blueprintVersion)
119 struct = Struct.newBuilder().mergeFrom(payload.getBytes()).build();
120 } catch (InvalidProtocolBufferException e) {
121 log.error("Failed converting payload for blueprint({}:{}) for action({}). {}", blueprintVersion,
122 blueprintName, action, e);
123 return QueryStatus.FAILURE;
126 final ExecutionServiceInput request = ExecutionServiceInput.newBuilder()
127 .setActionIdentifiers(actionIdentifiers)
129 .setCommonHeader(commonHeader).build();
131 final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
134 requestObserver.onNext(request);
135 } catch (RuntimeException e) {
136 requestObserver.onError(e);
137 return QueryStatus.FAILURE;
140 requestObserver.onCompleted();
142 finishLatch.await(1, TimeUnit.MINUTES);
143 } catch (InterruptedException e) {
144 log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName, action,
146 Thread.currentThread().interrupt();
147 return QueryStatus.FAILURE;
150 return responseStatus.get();