9cb7f765c7242c482957212729a321f98a102c01
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.plugins.event.carrier.restserver;
22
23 import java.net.URI;
24 import java.util.EnumMap;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import javax.ws.rs.core.Response;
29
30 import org.glassfish.grizzly.http.server.HttpServer;
31 import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
32 import org.glassfish.jersey.server.ResourceConfig;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
36 import org.onap.policy.apex.service.engine.event.ApexEventException;
37 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
38 import org.onap.policy.apex.service.engine.event.PeeredReference;
39 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
40 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
41 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * This class implements an Apex event consumer that receives events from a REST server.
47  *
48  * @author Liam Fallon (liam.fallon@ericsson.com)
49  */
50 public class ApexRestServerConsumer implements ApexEventConsumer, Runnable {
51     // Get a reference to the logger
52     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class);
53
54     private static final String BASE_URI_TEMPLATE = "http://%s:%d/apex";
55
56     // The amount of time to wait in milliseconds between checks that the consumer thread has stopped
57     private static final long REST_SERVER_CONSUMER_WAIT_SLEEP_TIME = 50;
58
59     // The event receiver that will receive events from this consumer
60     private ApexEventReceiver eventReceiver;
61
62     // The name for this consumer
63     private String name = null;
64
65     // The peer references for this event handler
66     private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
67
68     // The consumer thread and stopping flag
69     private Thread consumerThread;
70     private boolean stopOrderedFlag = false;
71
72     // The local HTTP server to use for REST call reception if we are running a local Grizzly server
73     private HttpServer server;
74
75     // Holds the next identifier for event execution.
76     private static AtomicLong nextExecutionID = new AtomicLong(0L);
77
78     /**
79      * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
80      * JVM
81      *
82      * @return the next candidate value for a Execution ID
83      */
84     private static synchronized long getNextExecutionId() {
85         return nextExecutionID.getAndIncrement();
86     }
87
88     /**
89      * {@inheritDoc}.
90      */
91     @Override
92     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
93             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
94         this.eventReceiver = incomingEventReceiver;
95         this.name = consumerName;
96
97         // Check and get the REST Properties
98         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) {
99             final String errorMessage =
100                     "specified consumer properties are not applicable to REST Server consumer (" + this.name + ")";
101             LOGGER.warn(errorMessage);
102             throw new ApexEventException(errorMessage);
103         }
104
105         // The REST parameters read from the parameter service
106         RestServerCarrierTechnologyParameters restConsumerProperties =
107                 (RestServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
108
109         // Check if we are in synchronous mode
110         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
111             final String errorMessage =
112                     "REST Server consumer (" + this.name + ") must run in synchronous mode with a REST Server producer";
113             LOGGER.warn(errorMessage);
114             throw new ApexEventException(errorMessage);
115         }
116
117         // Check if we're in standalone mode
118         if (restConsumerProperties.isStandalone()) {
119             // Check if host and port are defined
120             if (restConsumerProperties.getHost() == null || restConsumerProperties.getPort() == -1) {
121                 final String errorMessage =
122                         "the parameters \"host\" and \"port\" must be defined for REST Server consumer (" + this.name
123                                 + ") in standalone mode";
124                 LOGGER.warn(errorMessage);
125                 throw new ApexEventException(errorMessage);
126             }
127
128             // Compose the URI for the standalone server
129             final String baseUrl = String.format(BASE_URI_TEMPLATE, restConsumerProperties.getHost(),
130                     restConsumerProperties.getPort());
131
132             // Instantiate the standalone server
133             final ResourceConfig rc = new ResourceConfig(RestServerEndpoint.class, AccessControlFilter.class);
134             server = GrizzlyHttpServerFactory.createHttpServer(URI.create(baseUrl), rc);
135
136             while (!server.isStarted()) {
137                 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
138             }
139         }
140
141         // Register this consumer with the REST server end point
142         RestServerEndpoint.registerApexRestServerConsumer(this.name, this);
143     }
144
145     /**
146      * {@inheritDoc}.
147      */
148     @Override
149     public void start() {
150         // Configure and start the event reception thread
151         final String threadName = this.getClass().getName() + ":" + this.name;
152         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
153         consumerThread.setDaemon(true);
154         consumerThread.start();
155     }
156
157     /**
158      * {@inheritDoc}.
159      */
160     @Override
161     public String getName() {
162         return name;
163     }
164
165     /**
166      * {@inheritDoc}.
167      */
168     @Override
169     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
170         return peerReferenceMap.get(peeredMode);
171     }
172
173     /**
174      * {@inheritDoc}.
175      */
176     @Override
177     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
178         peerReferenceMap.put(peeredMode, peeredReference);
179     }
180
181     /**
182      * Receive an event for processing in Apex.
183      *
184      * @param event the event to receive
185      * @return the response from Apex
186      */
187     public Response receiveEvent(final String event) {
188         // Get an execution ID for the event
189         final long executionId = getNextExecutionId();
190
191         if (LOGGER.isDebugEnabled()) {
192             String message = name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event;
193             LOGGER.debug(message);
194         }
195
196         try {
197             // Send the event into Apex
198             eventReceiver.receiveEvent(executionId, null, event);
199         } catch (final Exception e) {
200             final String errorMessage = "error receiving events on event consumer " + name + ", " + e.getMessage();
201             LOGGER.warn(errorMessage, e);
202             return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
203                     .entity("{'errorMessage', '" + errorMessage + "'}").build();
204         }
205
206         final SynchronousEventCache synchronousEventCache =
207                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
208         // Wait until the event is in the cache of events sent to apex
209         do {
210             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
211         }
212         while (!synchronousEventCache.existsEventToApex(executionId));
213
214         // Now wait for the reply or for the event to time put
215         do {
216             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
217
218             // Check if we have received an answer from Apex
219             if (synchronousEventCache.existsEventFromApex(executionId)) {
220                 // We have received a response event, read and remove the response event and remove the sent event from
221                 // the cache
222                 final Object responseEvent = synchronousEventCache.removeCachedEventFromApexIfExists(executionId);
223                 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
224
225                 // Return the event as a response to the call
226                 return Response.status(Response.Status.OK.getStatusCode()).entity(responseEvent.toString()).build();
227             }
228         }
229         while (synchronousEventCache.existsEventToApex(executionId));
230
231         // The event timed out
232         final String errorMessage = "processing of event on event consumer " + name + " timed out, event=" + event;
233         LOGGER.warn(errorMessage);
234         return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
235                 .entity("{'errorMessage', '" + errorMessage + "'}").build();
236     }
237
238     /**
239      * {@inheritDoc}.
240      */
241     @Override
242     public void run() {
243         // Keep the consumer thread alive until it is shut down. We do not currently do anything in the thread but may
244         // do supervision in the future
245         while (consumerThread.isAlive() && !stopOrderedFlag) {
246             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
247         }
248
249         if (server != null) {
250             server.shutdown();
251         }
252     }
253
254     /**
255      * {@inheritDoc}.
256      */
257     @Override
258     public void stop() {
259         stopOrderedFlag = true;
260
261         while (consumerThread.isAlive()) {
262             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
263         }
264     }
265 }