410c83c4088283da8019bd269075565c97a89bce
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2020 Nordix Foundation.
4  *  Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.plugins.event.carrier.grpc;
23
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.util.JsonFormat;
26 import java.util.Properties;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
31 import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
32 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
33 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
35 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
36 import org.onap.policy.apex.service.engine.event.ApexEventException;
37 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
38 import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
39 import org.onap.policy.apex.service.engine.event.PeeredReference;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.onap.policy.cds.api.CdsProcessorListener;
43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
44 import org.onap.policy.cds.properties.CdsServerProperties;
45 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Concrete implementation of an Apex gRPC plugin that manages to send a GRPC request.
51  *
52  * @author Ajith Sreekumar(ajith.sreekumar@est.tech)
53  *
54  */
55 public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsProcessorListener {
56     private static final Logger LOGGER = LoggerFactory.getLogger(ApexGrpcProducer.class);
57
58     private CdsServerProperties props;
59     // The gRPC client
60     private CdsProcessorGrpcClient client;
61
62     private AtomicReference<ExecutionServiceOutput> cdsResponse = new AtomicReference<>();
63
64     /**
65      * {@inheritDoc}.
66      */
67     @Override
68     public void init(final String producerName, final EventHandlerParameters producerParameters)
69         throws ApexEventException {
70         this.name = producerName;
71
72         // Check and get the gRPC Properties
73         if (!(producerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) {
74             final String errorMessage =
75                 "Specified producer properties are not applicable to gRPC producer (" + this.name + ")";
76             throw new ApexEventException(errorMessage);
77         }
78         GrpcCarrierTechnologyParameters grpcProducerProperties =
79             (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
80         grpcProducerProperties.validateGrpcParameters(true);
81         client = makeGrpcClient(grpcProducerProperties);
82     }
83
84     private CdsProcessorGrpcClient makeGrpcClient(GrpcCarrierTechnologyParameters grpcProducerProperties) {
85         props = new CdsServerProperties();
86         props.setHost(grpcProducerProperties.getHost());
87         props.setPort(grpcProducerProperties.getPort());
88         props.setUsername(grpcProducerProperties.getUsername());
89         props.setPassword(grpcProducerProperties.getPassword());
90         props.setTimeout(grpcProducerProperties.getTimeout());
91
92         return new CdsProcessorGrpcClient(this, props);
93     }
94
95     /**
96      * {@inheritDoc}.
97      */
98     @Override
99     public void sendEvent(final long executionId, final Properties executionProperties, final String eventName,
100         final Object event) {
101
102         ExecutionServiceInput executionServiceInput;
103         Builder builder = ExecutionServiceInput.newBuilder();
104         try {
105             JsonFormat.parser().ignoringUnknownFields().merge((String) event, builder);
106             executionServiceInput = builder.build();
107         } catch (InvalidProtocolBufferException e) {
108             throw new ApexEventRuntimeException(
109                 "Incoming Event cannot be converted to ExecutionServiceInput type for gRPC request." + e.getMessage());
110         }
111         try {
112             CountDownLatch countDownLatch = client.sendRequest(executionServiceInput);
113             if (!countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS)) {
114                 cdsResponse.set(ExecutionServiceOutput.newBuilder().setStatus(Status.newBuilder()
115                     .setErrorMessage(CdsActorConstants.TIMED_OUT).setEventType(EventType.EVENT_COMPONENT_FAILURE))
116                     .build());
117                 LOGGER.error("gRPC Request timed out.");
118             }
119         } catch (InterruptedException e) {
120             LOGGER.error("gRPC request failed. {}", e.getMessage());
121             cdsResponse.set(ExecutionServiceOutput.newBuilder().setStatus(Status.newBuilder()
122                 .setErrorMessage(CdsActorConstants.INTERRUPTED).setEventType(EventType.EVENT_COMPONENT_FAILURE))
123                 .build());
124             Thread.currentThread().interrupt();
125         }
126
127         if (!EventType.EVENT_COMPONENT_EXECUTED.equals(cdsResponse.get().getStatus().getEventType())) {
128             LOGGER.error("Sending event \"{}\" by {} to CDS failed.", eventName, this.name);
129         }
130
131         consumeEvent(executionId, cdsResponse.get());
132     }
133
134     private void consumeEvent(long executionId, ExecutionServiceOutput event) {
135         // Find the peered consumer for this producer
136         final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
137         if (peeredRequestorReference == null) {
138             return;
139         }
140         // Find the gRPC Response Consumer that will take in the response to APEX Engine
141         final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
142         if (!(consumer instanceof ApexGrpcConsumer)) {
143             final String errorMessage = "Recieve of gRPC response by APEX failed,"
144                 + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event;
145             throw new ApexEventRuntimeException(errorMessage);
146         }
147
148         // Use the consumer to consume this response event in APEX
149         final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
150         try {
151             grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
152                 JsonFormat.printer().print(event));
153         } catch (ApexEventException | InvalidProtocolBufferException e) {
154             throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
155         }
156     }
157
158     /**
159      * {@inheritDoc}.
160      */
161     @Override
162     public void stop() {
163         client.close();
164     }
165
166     @Override
167     public void onMessage(ExecutionServiceOutput message) {
168         cdsResponse.set(message);
169     }
170
171     @Override
172     public void onError(Throwable throwable) {
173         String errorMsg = throwable.getLocalizedMessage();
174         cdsResponse.set(ExecutionServiceOutput.newBuilder()
175             .setStatus(Status.newBuilder().setErrorMessage(errorMsg).setEventType(EventType.EVENT_COMPONENT_FAILURE))
176             .build());
177         LOGGER.error("Failed processing blueprint {}", errorMsg, throwable);
178     }
179 }