2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.event.carrier.restserver;
24 import java.util.Properties;
25 import java.util.concurrent.atomic.AtomicLong;
26 import javax.ws.rs.core.Response;
27 import lombok.AccessLevel;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.engine.event.ApexEventException;
32 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
33 import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
34 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
35 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
37 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
38 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
39 import org.onap.policy.common.gson.GsonMessageBodyHandler;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * This class implements an Apex event consumer that receives events from a REST server.
46 * @author Liam Fallon (liam.fallon@ericsson.com)
48 public class ApexRestServerConsumer extends ApexPluginsEventConsumer {
49 // Get a reference to the logger
50 private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class);
52 // The amount of time to wait in milliseconds between checks that the consumer thread has stopped
53 private static final long REST_SERVER_CONSUMER_WAIT_SLEEP_TIME = 50;
55 // The event receiver that will receive events from this consumer
56 @Setter(AccessLevel.PACKAGE)
57 private ApexEventReceiver eventReceiver;
59 // The local HTTP server to use for REST call reception if we are running a local Grizzly server
60 @Getter(AccessLevel.PACKAGE)
61 private HttpServletServer server;
63 // Holds the next identifier for event execution.
64 private static AtomicLong nextExecutionID = new AtomicLong(0L);
67 * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
70 * @return the next candidate value for a Execution ID
72 private static synchronized long getNextExecutionId() {
73 return nextExecutionID.getAndIncrement();
80 public void init(final String consumerName, final EventHandlerParameters consumerParameters,
81 final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
82 this.eventReceiver = incomingEventReceiver;
83 this.name = consumerName;
85 // Check and get the REST Properties
86 if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) {
87 final String errorMessage =
88 "specified consumer properties are not applicable to REST Server consumer (" + this.name + ")";
89 LOGGER.warn(errorMessage);
90 throw new ApexEventException(errorMessage);
93 // The REST parameters read from the parameter service
94 RestServerCarrierTechnologyParameters restConsumerProperties =
95 (RestServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
97 // Check if we are in synchronous mode
98 if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
99 final String errorMessage =
100 "REST Server consumer (" + this.name + ") must run in synchronous mode with a REST Server producer";
101 LOGGER.warn(errorMessage);
102 throw new ApexEventException(errorMessage);
105 // Check if we're in standalone mode
106 if (restConsumerProperties.isStandalone()) {
107 // Check if host and port are defined
108 if (restConsumerProperties.getHost() == null || restConsumerProperties.getPort() == -1) {
109 final String errorMessage =
110 "the parameters \"host\" and \"port\" must be defined for REST Server consumer (" + this.name
111 + ") in standalone mode";
112 LOGGER.warn(errorMessage);
113 throw new ApexEventException(errorMessage);
116 // Instantiate the standalone server
117 LOGGER.info("Creating the Apex Rest Server");
118 createServer(restConsumerProperties);
120 while (!server.isAlive()) {
121 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
125 // Register this consumer with the REST server end point
126 RestServerEndpoint.registerApexRestServerConsumer(this.name, this);
129 private void createServer(RestServerCarrierTechnologyParameters restConsumerProperties) {
131 server = HttpServletServerFactoryInstance.getServerFactory().build(
132 restConsumerProperties.getName(),
133 restConsumerProperties.isHttps(),
134 restConsumerProperties.getHost(),
135 restConsumerProperties.getPort(), null, true, false);
136 if (restConsumerProperties.isAaf()) {
137 server.addFilterClass(null, ApexRestServerAafFilter.class.getName());
139 server.addServletClass(null, RestServerEndpoint.class.getName());
140 server.addServletClass(null, AccessControlFilter.class.getName());
141 server.setSerializationProvider(GsonMessageBodyHandler.class.getName());
142 if (null != restConsumerProperties.getUserName()
143 && null != restConsumerProperties.getPassword()) {
144 server.setBasicAuthentication(restConsumerProperties.getUserName(),
145 restConsumerProperties.getPassword(), null);
150 * Receive an event for processing in Apex.
152 * @param event the event to receive
153 * @return the response from Apex
155 public Response receiveEvent(final String event) {
156 // Get an execution ID for the event
157 final long executionId = getNextExecutionId();
159 if (LOGGER.isDebugEnabled()) {
160 String message = name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event;
161 LOGGER.debug(message);
165 // Send the event into Apex
166 eventReceiver.receiveEvent(executionId, new Properties(), event);
167 } catch (final Exception e) {
168 final String errorMessage = "error receiving events on event consumer " + name;
169 LOGGER.warn(errorMessage, e);
170 return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
171 .entity("{'errorMessage', '" + errorMessage + ", " + e.getMessage() + "'}").build();
174 final SynchronousEventCache synchronousEventCache =
175 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
176 // Wait until the event is in the cache of events sent to apex
178 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
179 } while (!synchronousEventCache.existsEventToApex(executionId));
181 // Now wait for the reply or for the event to time put
183 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
185 // Check if we have received an answer from Apex
186 if (synchronousEventCache.existsEventFromApex(executionId)) {
187 // We have received a response event, read and remove the response event and remove the sent event from
189 final Object responseEvent = synchronousEventCache.removeCachedEventFromApexIfExists(executionId);
190 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
192 // Return the event as a response to the call
193 return Response.status(Response.Status.OK.getStatusCode()).entity(responseEvent.toString()).build();
195 } while (synchronousEventCache.existsEventToApex(executionId));
197 // The event timed out
198 final String errorMessage = "processing of event on event consumer " + name + " timed out, event=" + event;
199 LOGGER.warn(errorMessage);
200 return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
201 .entity("{'errorMessage', '" + errorMessage + "'}").build();
209 // Keep the consumer thread alive until it is shut down. We do not currently do anything in the thread but may
210 // do supervision in the future
211 while (consumerThread.isAlive() && !stopOrderedFlag) {
212 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
215 if (server != null) {
225 stopOrderedFlag = true;
227 while (consumerThread.isAlive()) {
228 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
230 if (server != null) {