4581b179c3f0c98a765dabd9cd7faa267d3b6c4b
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / impl / eventrequestor / EventRequestorConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
31 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
32 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
33 import org.onap.policy.apex.service.engine.event.ApexEventException;
34 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
35 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
36 import org.onap.policy.apex.service.engine.event.PeeredReference;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * This class implements an Apex event consumer that receives events from its peered event requestor
44  * producer.
45  *
46  * @author Liam Fallon (liam.fallon@ericsson.com)
47  */
48 public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
49     // Get a reference to the logger
50     private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class);
51
52     // The amount of time to wait in milliseconds between checks that the consumer thread has
53     // stopped
54     private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50;
55
56     // The event receiver that will receive events from this consumer
57     private ApexEventReceiver eventReceiver;
58
59     // The name for this consumer
60     private String name = null;
61
62     // The peer references for this event handler
63     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
64             new EnumMap<>(EventHandlerPeeredMode.class);
65
66     // Temporary request holder for incoming event send requests
67     private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>();
68
69     // The consumer thread and stopping flag
70     private Thread consumerThread;
71     private boolean stopOrderedFlag = false;
72
73     // The number of events received to date
74     private int eventsReceived = 0;
75
76     @Override
77     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
78             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
79         this.eventReceiver = incomingEventReceiver;
80         this.name = consumerName;
81
82         // Check and get the event requestor consumer properties
83         if (!(consumerParameters
84                 .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
85             final String errorMessage =
86                     "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")";
87             LOGGER.warn(errorMessage);
88             throw new ApexEventException(errorMessage);
89         }
90
91         // Check if we are in peered mode
92         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
93             final String errorMessage = "event Requestor consumer (" + this.name
94                     + ") must run in peered requestor mode with a event Requestor producer";
95             LOGGER.warn(errorMessage);
96             throw new ApexEventException(errorMessage);
97         }
98
99     }
100
101     /**
102      * Receive an incoming event send request from the peered event Requestor producer and queue it.
103      *
104      * @param eventObject the incoming event to process
105      * @throws ApexEventRuntimeException on queueing errors
106      */
107     public void processEvent(final Object eventObject) {
108         // Push the event onto the queue for handling
109         try {
110             incomingEventRequestQueue.add(eventObject);
111         } catch (final Exception e) {
112             final String errorMessage =
113                     "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")";
114             LOGGER.warn(errorMessage, e);
115             throw new ApexEventRuntimeException(errorMessage);
116         }
117     }
118
119     /**
120      * {@inheritDoc}.
121      */
122     @Override
123     public void start() {
124         // Configure and start the event reception thread
125         final String threadName = this.getClass().getName() + ":" + this.name;
126         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
127         consumerThread.setDaemon(true);
128         consumerThread.start();
129     }
130
131     /**
132      * {@inheritDoc}.
133      */
134     @Override
135     public String getName() {
136         return name;
137     }
138
139     /**
140      * Get the number of events received to date.
141      *
142      * @return the number of events received
143      */
144     public int getEventsReceived() {
145         return eventsReceived;
146     }
147
148     /**
149      * {@inheritDoc}.
150      */
151     @Override
152     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
153         return peerReferenceMap.get(peeredMode);
154     }
155
156     /**
157      * {@inheritDoc}.
158      */
159     @Override
160     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
161         peerReferenceMap.put(peeredMode, peeredReference);
162     }
163
164     /**
165      * {@inheritDoc}.
166      */
167     @Override
168     public void run() {
169         // The endless loop that receives events using REST calls
170         while (consumerThread.isAlive() && !stopOrderedFlag) {
171             try {
172                 // Take the next event from the queue
173                 final Object eventObject =
174                         incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
175                 if (eventObject == null) {
176                     // Poll timed out, wait again
177                     continue;
178                 }
179
180                 // Send the event into Apex
181                 eventReceiver.receiveEvent(new Properties(), eventObject);
182
183                 eventsReceived++;
184             } catch (final InterruptedException e) {
185                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
186                 Thread.currentThread().interrupt();
187             } catch (final Exception e) {
188                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
189             }
190         }
191     }
192
193     /**
194      * {@inheritDoc}.
195      */
196     @Override
197     public void stop() {
198         stopOrderedFlag = true;
199
200         while (consumerThread.isAlive()) {
201             ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME);
202         }
203     }
204 }