c142da91be5e8aecd8537fa82644670a98848c02
[ccsdk/sli/adaptors.git] /
1 /*
2  * Copyright (C) 2019 Bell Canada.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.ccsdk.sli.adaptors.grpc.cds;
17
18 import com.google.protobuf.InvalidProtocolBufferException;
19 import com.google.protobuf.Struct;
20 import com.google.protobuf.Struct.Builder;
21 import io.grpc.ManagedChannel;
22 import io.grpc.Status;
23 import io.grpc.stub.StreamObserver;
24 import java.util.Map;
25 import java.util.UUID;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import org.onap.ccsdk.apps.controllerblueprints.common.api.ActionIdentifiers;
30 import org.onap.ccsdk.apps.controllerblueprints.common.api.CommonHeader;
31 import org.onap.ccsdk.apps.controllerblueprints.common.api.Flag;
32 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
33 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
34 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput;
35 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput;
36 import org.onap.ccsdk.sli.adaptors.grpc.JsonFormat;
37 import org.onap.ccsdk.sli.adaptors.grpc.Utils;
38 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
39 import org.onap.ccsdk.sli.core.sli.SvcLogicResource.QueryStatus;
40 import org.onap.ccsdk.sli.core.slipluginutils.SliPluginUtils;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 public class BlueprintProcessingHandler {
45
46     private static final Logger log = LoggerFactory.getLogger(BlueprintProcessingHandler.class);
47
48     private static final String CCSDK_ORIGINATOR = "CCSDK";
49     private static final String IS_FORCE_PROP = "is_force";
50     private static final String TTL_PROP = "ttl";
51     private static final String BLUEPRINT_NAME_PROP = "blueprint_name";
52     private static final String BLUEPRINT_VERSION_PROP = "blueprint_version";
53     private static final String ACTION_PROP = "action";
54     private static final String MODE_PROP = "mode";
55     private static final String PAYLOAD_PROP = "payload";
56
57     QueryStatus process(final Map<String, String> parameters, final ManagedChannel channel) {
58         try {
59             SliPluginUtils.checkParameters(parameters,
60                 new String[]{BLUEPRINT_NAME_PROP, BLUEPRINT_VERSION_PROP, ACTION_PROP, MODE_PROP}, log);
61         } catch (SvcLogicException e) {
62             return QueryStatus.FAILURE;
63         }
64
65         final boolean isForce = Boolean.getBoolean(parameters.get(IS_FORCE_PROP));
66         final int ttl = Integer.parseInt(parameters.get(TTL_PROP));
67         final String blueprintName = parameters.get(BLUEPRINT_NAME_PROP);
68         final String blueprintVersion = parameters.get(BLUEPRINT_VERSION_PROP);
69         final String action = parameters.get(ACTION_PROP);
70         final String mode = parameters.get(MODE_PROP);
71         final String payload = parameters.get(PAYLOAD_PROP);
72
73         log.info("Processing blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
74
75         final AtomicReference<QueryStatus> responseStatus = new AtomicReference<>();
76         final CountDownLatch finishLatch = new CountDownLatch(1);
77
78         final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
79
80         final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
81             @Override
82             public void onNext(ExecutionServiceOutput output) {
83                 log.info("onNext: {}", output);
84             }
85
86             @Override
87             public void onError(Throwable t) {
88                 Status status = Status.fromThrowable(t);
89                 log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName,
90                     action, status);
91                 responseStatus.compareAndSet(null, QueryStatus.FAILURE);
92                 finishLatch.countDown();
93             }
94
95             @Override
96             public void onCompleted() {
97                 log.info("Completed blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
98                 responseStatus.compareAndSet(null, QueryStatus.SUCCESS);
99                 finishLatch.countDown();
100             }
101         };
102
103         final CommonHeader commonHeader = CommonHeader.newBuilder()
104             .setOriginatorId(CCSDK_ORIGINATOR)
105             .setRequestId(UUID.randomUUID().toString())
106             .setTimestamp(Utils.timestamp())
107             .setFlag(Flag.newBuilder()
108                 .setIsForce(isForce)
109                 .setTtl(ttl)
110                 .build())
111             .build();
112         final ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
113             .setActionName(action)
114             .setBlueprintName(blueprintName)
115             .setBlueprintVersion(blueprintVersion)
116             .setMode(mode)
117             .build();
118
119         Builder struct = Struct.newBuilder();
120         try {
121             JsonFormat.parser().merge(payload, struct);
122         } catch (InvalidProtocolBufferException e) {
123             log.error("Failed converting payload for blueprint({}:{}) for action({}). {}", blueprintVersion,
124                 blueprintName, action, e);
125             return QueryStatus.FAILURE;
126         }
127
128         final ExecutionServiceInput request = ExecutionServiceInput.newBuilder()
129             .setActionIdentifiers(actionIdentifiers)
130             .setPayload(struct.build())
131             .setCommonHeader(commonHeader).build();
132
133         final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
134
135         try {
136             requestObserver.onNext(request);
137         } catch (RuntimeException e) {
138             requestObserver.onError(e);
139             return QueryStatus.FAILURE;
140         }
141
142         requestObserver.onCompleted();
143         try {
144             finishLatch.await(1, TimeUnit.MINUTES);
145         } catch (InterruptedException e) {
146             log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName, action,
147                 e);
148             Thread.currentThread().interrupt();
149             return QueryStatus.FAILURE;
150         }
151
152         return responseStatus.get();
153     }
154 }