a8c5086592cd719824cd69cca31555a2fcb7d2ca
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.restserver;
22
23 import java.net.URI;
24 import java.util.EnumMap;
25 import java.util.Map;
26 import java.util.Properties;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import javax.ws.rs.core.Response;
30
31 import org.glassfish.grizzly.http.server.HttpServer;
32 import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
33 import org.glassfish.jersey.server.ResourceConfig;
34 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
35 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
36 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
37 import org.onap.policy.apex.service.engine.event.ApexEventException;
38 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
39 import org.onap.policy.apex.service.engine.event.PeeredReference;
40 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
42 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * This class implements an Apex event consumer that receives events from a REST server.
48  *
49  * @author Liam Fallon (liam.fallon@ericsson.com)
50  */
51 public class ApexRestServerConsumer implements ApexEventConsumer, Runnable {
52     // Get a reference to the logger
53     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class);
54
55     private static final String BASE_URI_TEMPLATE = "http://%s:%d/apex";
56
57     // The amount of time to wait in milliseconds between checks that the consumer thread has stopped
58     private static final long REST_SERVER_CONSUMER_WAIT_SLEEP_TIME = 50;
59
60     // The event receiver that will receive events from this consumer
61     private ApexEventReceiver eventReceiver;
62
63     // The name for this consumer
64     private String name = null;
65
66     // The peer references for this event handler
67     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
68
69     // The consumer thread and stopping flag
70     private Thread consumerThread;
71     private boolean stopOrderedFlag = false;
72
73     // The local HTTP server to use for REST call reception if we are running a local Grizzly server
74     private HttpServer server;
75
76     // Holds the next identifier for event execution.
77     private static AtomicLong nextExecutionID = new AtomicLong(0L);
78
79     /**
80      * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
81      * JVM
82      *
83      * @return the next candidate value for a Execution ID
84      */
85     private static synchronized long getNextExecutionId() {
86         return nextExecutionID.getAndIncrement();
87     }
88
89     /**
90      * {@inheritDoc}.
91      */
92     @Override
93     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
94             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
95         this.eventReceiver = incomingEventReceiver;
96         this.name = consumerName;
97
98         // Check and get the REST Properties
99         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) {
100             final String errorMessage =
101                     "specified consumer properties are not applicable to REST Server consumer (" + this.name + ")";
102             LOGGER.warn(errorMessage);
103             throw new ApexEventException(errorMessage);
104         }
105
106         // The REST parameters read from the parameter service
107         RestServerCarrierTechnologyParameters restConsumerProperties =
108                 (RestServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
109
110         // Check if we are in synchronous mode
111         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
112             final String errorMessage =
113                     "REST Server consumer (" + this.name + ") must run in synchronous mode with a REST Server producer";
114             LOGGER.warn(errorMessage);
115             throw new ApexEventException(errorMessage);
116         }
117
118         // Check if we're in standalone mode
119         if (restConsumerProperties.isStandalone()) {
120             // Check if host and port are defined
121             if (restConsumerProperties.getHost() == null || restConsumerProperties.getPort() == -1) {
122                 final String errorMessage =
123                         "the parameters \"host\" and \"port\" must be defined for REST Server consumer (" + this.name
124                                 + ") in standalone mode";
125                 LOGGER.warn(errorMessage);
126                 throw new ApexEventException(errorMessage);
127             }
128
129             // Compose the URI for the standalone server
130             final String baseUrl = String.format(BASE_URI_TEMPLATE, restConsumerProperties.getHost(),
131                     restConsumerProperties.getPort());
132
133             // Instantiate the standalone server
134             final ResourceConfig rc = new ResourceConfig(RestServerEndpoint.class, AccessControlFilter.class);
135             server = GrizzlyHttpServerFactory.createHttpServer(URI.create(baseUrl), rc);
136
137             while (!server.isStarted()) {
138                 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
139             }
140         }
141
142         // Register this consumer with the REST server end point
143         RestServerEndpoint.registerApexRestServerConsumer(this.name, this);
144     }
145
146     /**
147      * {@inheritDoc}.
148      */
149     @Override
150     public void start() {
151         // Configure and start the event reception thread
152         final String threadName = this.getClass().getName() + ":" + this.name;
153         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
154         consumerThread.setDaemon(true);
155         consumerThread.start();
156     }
157
158     /**
159      * {@inheritDoc}.
160      */
161     @Override
162     public String getName() {
163         return name;
164     }
165
166     /**
167      * {@inheritDoc}.
168      */
169     @Override
170     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
171         return peerReferenceMap.get(peeredMode);
172     }
173
174     /**
175      * {@inheritDoc}.
176      */
177     @Override
178     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
179         peerReferenceMap.put(peeredMode, peeredReference);
180     }
181
182     /**
183      * Receive an event for processing in Apex.
184      *
185      * @param event the event to receive
186      * @return the response from Apex
187      */
188     public Response receiveEvent(final String event) {
189         // Get an execution ID for the event
190         final long executionId = getNextExecutionId();
191
192         if (LOGGER.isDebugEnabled()) {
193             String message = name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event;
194             LOGGER.debug(message);
195         }
196
197         try {
198             // Send the event into Apex
199             eventReceiver.receiveEvent(executionId, new Properties(), event);
200         } catch (final Exception e) {
201             final String errorMessage = "error receiving events on event consumer " + name + ", " + e.getMessage();
202             LOGGER.warn(errorMessage, e);
203             return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
204                     .entity("{'errorMessage', '" + errorMessage + "'}").build();
205         }
206
207         final SynchronousEventCache synchronousEventCache =
208                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
209         // Wait until the event is in the cache of events sent to apex
210         do {
211             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
212         }
213         while (!synchronousEventCache.existsEventToApex(executionId));
214
215         // Now wait for the reply or for the event to time put
216         do {
217             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
218
219             // Check if we have received an answer from Apex
220             if (synchronousEventCache.existsEventFromApex(executionId)) {
221                 // We have received a response event, read and remove the response event and remove the sent event from
222                 // the cache
223                 final Object responseEvent = synchronousEventCache.removeCachedEventFromApexIfExists(executionId);
224                 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
225
226                 // Return the event as a response to the call
227                 return Response.status(Response.Status.OK.getStatusCode()).entity(responseEvent.toString()).build();
228             }
229         }
230         while (synchronousEventCache.existsEventToApex(executionId));
231
232         // The event timed out
233         final String errorMessage = "processing of event on event consumer " + name + " timed out, event=" + event;
234         LOGGER.warn(errorMessage);
235         return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
236                 .entity("{'errorMessage', '" + errorMessage + "'}").build();
237     }
238
239     /**
240      * {@inheritDoc}.
241      */
242     @Override
243     public void run() {
244         // Keep the consumer thread alive until it is shut down. We do not currently do anything in the thread but may
245         // do supervision in the future
246         while (consumerThread.isAlive() && !stopOrderedFlag) {
247             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
248         }
249
250         if (server != null) {
251             server.shutdown();
252         }
253     }
254
255     /**
256      * {@inheritDoc}.
257      */
258     @Override
259     public void stop() {
260         stopOrderedFlag = true;
261
262         while (consumerThread.isAlive()) {
263             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
264         }
265     }
266 }