Update for SNI Chcecking
[policy/apex-pdp.git] / plugins / plugins-event / plugins-event-carrier / plugins-event-carrier-restserver / src / main / java / org / onap / policy / apex / plugins / event / carrier / restserver / ApexRestServerConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020,2023 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.plugins.event.carrier.restserver;
24
25 import java.util.Properties;
26 import java.util.concurrent.atomic.AtomicLong;
27 import javax.ws.rs.core.Response;
28 import lombok.AccessLevel;
29 import lombok.Getter;
30 import lombok.Setter;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.engine.event.ApexEventException;
33 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
34 import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
35 import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
38 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
39 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
40 import org.onap.policy.common.gson.GsonMessageBodyHandler;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * This class implements an Apex event consumer that receives events from a REST server.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 public class ApexRestServerConsumer extends ApexPluginsEventConsumer {
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestServerConsumer.class);
52
53     // The amount of time to wait in milliseconds between checks that the consumer thread has stopped
54     private static final long REST_SERVER_CONSUMER_WAIT_SLEEP_TIME = 50;
55
56     // The event receiver that will receive events from this consumer
57     @Setter(AccessLevel.PACKAGE)
58     private ApexEventReceiver eventReceiver;
59
60     // The local HTTP server to use for REST call reception if we are running a local Grizzly server
61     @Getter(AccessLevel.PACKAGE)
62     private HttpServletServer server;
63
64     // Holds the next identifier for event execution.
65     private static AtomicLong nextExecutionID = new AtomicLong(0L);
66
67     /**
68      * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
69      * JVM
70      *
71      * @return the next candidate value for a Execution ID
72      */
73     private static synchronized long getNextExecutionId() {
74         return nextExecutionID.getAndIncrement();
75     }
76
77     /**
78      * {@inheritDoc}.
79      */
80     @Override
81     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
82             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
83         this.eventReceiver = incomingEventReceiver;
84         this.name = consumerName;
85
86         // Check and get the REST Properties
87         if (!(consumerParameters.getCarrierTechnologyParameters() instanceof RestServerCarrierTechnologyParameters)) {
88             final String errorMessage =
89                     "specified consumer properties are not applicable to REST Server consumer (" + this.name + ")";
90             LOGGER.warn(errorMessage);
91             throw new ApexEventException(errorMessage);
92         }
93
94         // The REST parameters read from the parameter service
95         RestServerCarrierTechnologyParameters restConsumerProperties =
96                 (RestServerCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
97
98         // Check if we are in synchronous mode
99         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
100             final String errorMessage =
101                     "REST Server consumer (" + this.name + ") must run in synchronous mode with a REST Server producer";
102             LOGGER.warn(errorMessage);
103             throw new ApexEventException(errorMessage);
104         }
105
106         // Check if we're in standalone mode
107         if (restConsumerProperties.isStandalone()) {
108             // Check if host and port are defined
109             if (restConsumerProperties.getHost() == null || restConsumerProperties.getPort() == -1) {
110                 final String errorMessage =
111                         "the parameters \"host\" and \"port\" must be defined for REST Server consumer (" + this.name
112                                 + ") in standalone mode";
113                 LOGGER.warn(errorMessage);
114                 throw new ApexEventException(errorMessage);
115             }
116
117             // Instantiate the standalone server
118             LOGGER.info("Creating the Apex Rest Server");
119             createServer(restConsumerProperties);
120             server.start();
121             while (!server.isAlive()) {
122                 ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
123             }
124         }
125
126         // Register this consumer with the REST server end point
127         RestServerEndpoint.registerApexRestServerConsumer(this.name, this);
128     }
129
130     private void createServer(RestServerCarrierTechnologyParameters restConsumerProperties) {
131
132         server = HttpServletServerFactoryInstance.getServerFactory().build(
133             restConsumerProperties.getName(),
134             restConsumerProperties.isHttps(),
135             restConsumerProperties.getHost(),
136             restConsumerProperties.getPort(),
137             restConsumerProperties.isSniHostCheck(),
138             null,
139             true,
140             false
141         );
142
143         if (restConsumerProperties.isAaf()) {
144             server.addFilterClass(null, ApexRestServerAafFilter.class.getName());
145         }
146         server.addServletClass(null, RestServerEndpoint.class.getName());
147         server.addServletClass(null, AccessControlFilter.class.getName());
148         server.setSerializationProvider(GsonMessageBodyHandler.class.getName());
149         if (null != restConsumerProperties.getUserName()
150             && null != restConsumerProperties.getPassword()) {
151             server.setBasicAuthentication(restConsumerProperties.getUserName(),
152                 restConsumerProperties.getPassword(), null);
153         }
154     }
155
156     /**
157      * Receive an event for processing in Apex.
158      *
159      * @param event the event to receive
160      * @return the response from Apex
161      */
162     public Response receiveEvent(final String event) {
163         // Get an execution ID for the event
164         final long executionId = getNextExecutionId();
165
166         if (LOGGER.isDebugEnabled()) {
167             String message = name + ": sending event " + name + '_' + executionId + " to Apex, event=" + event;
168             LOGGER.debug(message);
169         }
170
171         try {
172             // Send the event into Apex
173             eventReceiver.receiveEvent(executionId, new Properties(), event);
174         } catch (final Exception e) {
175             final String errorMessage = "error receiving events on event consumer " + name;
176             LOGGER.warn(errorMessage, e);
177             return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
178                     .entity("{'errorMessage', '" + errorMessage + ", " + e.getMessage() + "'}").build();
179         }
180
181         final var synchronousEventCache =
182                 (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
183         // Wait until the event is in the cache of events sent to apex
184         do {
185             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
186         } while (!synchronousEventCache.existsEventToApex(executionId));
187
188         // Now wait for the reply or for the event to time put
189         do {
190             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
191
192             // Check if we have received an answer from Apex
193             if (synchronousEventCache.existsEventFromApex(executionId)) {
194                 // We have received a response event, read and remove the response event and remove the sent event from
195                 // the cache
196                 final Object responseEvent = synchronousEventCache.removeCachedEventFromApexIfExists(executionId);
197                 synchronousEventCache.removeCachedEventToApexIfExists(executionId);
198
199                 // Return the event as a response to the call
200                 return Response.status(Response.Status.OK.getStatusCode()).entity(responseEvent.toString()).build();
201             }
202         } while (synchronousEventCache.existsEventToApex(executionId));
203
204         // The event timed out
205         final String errorMessage = "processing of event on event consumer " + name + " timed out, event=" + event;
206         LOGGER.warn(errorMessage);
207         return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
208                 .entity("{'errorMessage', '" + errorMessage + "'}").build();
209     }
210
211     /**
212      * {@inheritDoc}.
213      */
214     @Override
215     public void run() {
216         // Keep the consumer thread alive until it is shut down. We do not currently do anything in the thread but may
217         // do supervision in the future
218         while (consumerThread.isAlive() && !stopOrderedFlag) {
219             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
220         }
221
222         if (server != null) {
223             server.shutdown();
224         }
225     }
226
227     /**
228      * {@inheritDoc}.
229      */
230     @Override
231     public void stop() {
232         stopOrderedFlag = true;
233
234         while (consumerThread.isAlive()) {
235             ThreadUtilities.sleep(REST_SERVER_CONSUMER_WAIT_SLEEP_TIME);
236         }
237         if (server != null) {
238             server.stop();
239         }
240     }
241 }