5f4fad42ae6e5f240790748155c11406ac5304db
[ccsdk/sli/adaptors.git] /
1 /*
2  * Copyright (C) 2019 Bell Canada.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onap.ccsdk.sli.adaptors.grpc.cds;
17
18 import com.google.common.collect.Maps;
19 import com.google.protobuf.InvalidProtocolBufferException;
20 import com.google.protobuf.Struct;
21 import com.google.protobuf.Struct.Builder;
22 import io.grpc.ManagedChannel;
23 import io.grpc.Status;
24 import io.grpc.stub.StreamObserver;
25 import java.util.Map;
26 import java.util.UUID;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
31 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
32 import org.onap.ccsdk.cds.controllerblueprints.common.api.Flag;
33 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
37 import org.onap.ccsdk.sli.adaptors.grpc.JsonFormat;
38 import org.onap.ccsdk.sli.adaptors.grpc.Utils;
39 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
40 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
41 import org.onap.ccsdk.sli.core.sli.SvcLogicResource.QueryStatus;
42 import org.onap.ccsdk.sli.core.slipluginutils.SliPluginUtils;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 class BlueprintProcessingHandler {
47
48     private static final Logger log = LoggerFactory.getLogger(BlueprintProcessingHandler.class);
49
50     private static final int DEFAULT_TTL = 180;
51     private static final String CCSDK_ORIGINATOR = "CCSDK";
52     private static final String IS_FORCE_PROP = "is_force";
53     private static final String TTL_PROP = "ttl";
54     private static final String BLUEPRINT_NAME_PROP = "blueprint_name";
55     private static final String BLUEPRINT_VERSION_PROP = "blueprint_version";
56     private static final String ACTION_PROP = "action";
57     private static final String MODE_PROP = "mode";
58     private static final String PAYLOAD_PROP = "payload";
59     private static final String PREFIX_PROP = "prefix";
60
61     QueryStatus process(final Map<String, String> parameters, final ManagedChannel channel, final SvcLogicContext ctx) {
62         try {
63             SliPluginUtils.checkParameters(parameters,
64                 new String[]{BLUEPRINT_NAME_PROP, BLUEPRINT_VERSION_PROP, ACTION_PROP, MODE_PROP, PREFIX_PROP}, log);
65         } catch (SvcLogicException e) {
66             return QueryStatus.FAILURE;
67         }
68
69         final boolean isForce = Boolean.getBoolean(parameters.get(IS_FORCE_PROP));
70         int ttl = Integer.parseInt(parameters.get(TTL_PROP));
71         if (ttl == 0) {
72             ttl = DEFAULT_TTL;
73         }
74         final String blueprintName = parameters.get(BLUEPRINT_NAME_PROP);
75         final String blueprintVersion = parameters.get(BLUEPRINT_VERSION_PROP);
76         final String action = parameters.get(ACTION_PROP);
77         final String mode = parameters.get(MODE_PROP);
78         final String payload = parameters.get(PAYLOAD_PROP);
79         final String prefix = parameters.get(PREFIX_PROP);
80
81         log.info("Processing blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
82
83         final AtomicReference<QueryStatus> responseStatus = new AtomicReference<>();
84         final CountDownLatch finishLatch = new CountDownLatch(1);
85
86         final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
87
88         final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
89             @Override
90             public void onNext(ExecutionServiceOutput output) {
91                 log.info("onNext: {}", output);
92
93                 Map<String, String> jsonToCtx = Maps.newHashMap();
94                 String json = "";
95                 try {
96                     json = JsonFormat.printer().print(output);
97                 } catch (InvalidProtocolBufferException e) {
98                     log.error("Failed to parse received message. blueprint({}:{}) for action({}). {}", blueprintVersion,
99                         blueprintName, action, output, e);
100                     responseStatus.compareAndSet(null, QueryStatus.FAILURE);
101                     finishLatch.countDown();
102                 }
103
104                 ctx.setAttribute("BlueprintProcessingHandler_process", json);
105                 jsonToCtx.put("source", "BlueprintProcessingHandler_process");
106                 jsonToCtx.put("outputPath", prefix);
107                 jsonToCtx.put("isEscaped", Boolean.FALSE.toString());
108
109                 try {
110                     SliPluginUtils.jsonStringToCtx(jsonToCtx, ctx);
111                 } catch (SvcLogicException e) {
112                     log.error("Failed to put jsonStringToCtx. blueprint({}:{}) for action({}). {}", blueprintVersion,
113                         blueprintName, action, output, e);
114                     responseStatus.compareAndSet(null, QueryStatus.FAILURE);
115                     finishLatch.countDown();
116                 }
117             }
118
119             @Override
120             public void onError(Throwable t) {
121                 Status status = Status.fromThrowable(t);
122                 log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName,
123                     action, status);
124                 responseStatus.compareAndSet(null, QueryStatus.FAILURE);
125                 finishLatch.countDown();
126             }
127
128             @Override
129             public void onCompleted() {
130                 log.info("Completed blueprint({}:{}) for action({})", blueprintVersion, blueprintName, action);
131                 responseStatus.compareAndSet(null, QueryStatus.SUCCESS);
132                 finishLatch.countDown();
133             }
134         };
135
136         final CommonHeader commonHeader = CommonHeader.newBuilder()
137             .setOriginatorId(CCSDK_ORIGINATOR)
138             .setRequestId(UUID.randomUUID().toString())
139             .setTimestamp(Utils.timestamp())
140             .setFlag(Flag.newBuilder()
141                 .setIsForce(isForce)
142                 .setTtl(ttl)
143                 .build())
144             .build();
145         final ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
146             .setActionName(action)
147             .setBlueprintName(blueprintName)
148             .setBlueprintVersion(blueprintVersion)
149             .setMode(mode)
150             .build();
151
152         Builder struct = Struct.newBuilder();
153         try {
154             JsonFormat.parser().merge(payload, struct);
155         } catch (InvalidProtocolBufferException e) {
156             log.error("Failed converting payload for blueprint({}:{}) for action({}). {}", blueprintVersion,
157                 blueprintName, action, e);
158             return QueryStatus.FAILURE;
159         }
160
161         final ExecutionServiceInput request = ExecutionServiceInput.newBuilder()
162             .setActionIdentifiers(actionIdentifiers)
163             .setPayload(struct.build())
164             .setCommonHeader(commonHeader).build();
165
166         final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
167
168         try {
169             requestObserver.onNext(request);
170         } catch (RuntimeException e) {
171             requestObserver.onError(e);
172             return QueryStatus.FAILURE;
173         }
174
175         requestObserver.onCompleted();
176         try {
177             finishLatch.await(ttl, TimeUnit.SECONDS);
178         } catch (InterruptedException e) {
179             log.error("Failed processing blueprint({}:{}) for action({}). {}", blueprintVersion, blueprintName, action,
180                 e);
181             Thread.currentThread().interrupt();
182             return QueryStatus.FAILURE;
183         }
184
185         return responseStatus.get();
186     }
187 }