8d845e0ced0bf87f59734ad765e19d808929f577
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28
29 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
32 import org.onap.policy.apex.service.engine.event.ApexEventException;
33 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35 import org.onap.policy.apex.service.engine.event.PeeredReference;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * This class implements an Apex event consumer that receives events from its peered event requestor
43  * producer.
44  *
45  * @author Liam Fallon (liam.fallon@ericsson.com)
46  */
47 public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
48     // Get a reference to the logger
49     private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class);
50
51     // The amount of time to wait in milliseconds between checks that the consumer thread has
52     // stopped
53     private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50;
54
55     // The event receiver that will receive events from this consumer
56     private ApexEventReceiver eventReceiver;
57
58     // The name for this consumer
59     private String name = null;
60
61     // The peer references for this event handler
62     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
63             new EnumMap<>(EventHandlerPeeredMode.class);
64
65     // Temporary request holder for incoming event send requests
66     private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>();
67
68     // The consumer thread and stopping flag
69     private Thread consumerThread;
70     private boolean stopOrderedFlag = false;
71
72     // The number of events received to date
73     private int eventsReceived = 0;
74
75     @Override
76     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
77             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
78         this.eventReceiver = incomingEventReceiver;
79         this.name = consumerName;
80
81         // Check and get the event requestor consumer properties
82         if (!(consumerParameters
83                 .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
84             final String errorMessage =
85                     "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")";
86             LOGGER.warn(errorMessage);
87             throw new ApexEventException(errorMessage);
88         }
89
90         // Check if we are in peered mode
91         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
92             final String errorMessage = "event Requestor consumer (" + this.name
93                     + ") must run in peered requestor mode with a event Requestor producer";
94             LOGGER.warn(errorMessage);
95             throw new ApexEventException(errorMessage);
96         }
97
98     }
99
100     /**
101      * Receive an incoming event send request from the peered event Requestor producer and queue it.
102      *
103      * @param eventObject the incoming event to process
104      * @throws ApexEventRuntimeException on queueing errors
105      */
106     public void processEvent(final Object eventObject) {
107         // Push the event onto the queue for handling
108         try {
109             incomingEventRequestQueue.add(eventObject);
110         } catch (final Exception e) {
111             final String errorMessage =
112                     "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")";
113             LOGGER.warn(errorMessage, e);
114             throw new ApexEventRuntimeException(errorMessage);
115         }
116     }
117
118     /*
119      * (non-Javadoc)
120      *
121      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
122      */
123     @Override
124     public void start() {
125         // Configure and start the event reception thread
126         final String threadName = this.getClass().getName() + ":" + this.name;
127         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
128         consumerThread.setDaemon(true);
129         consumerThread.start();
130     }
131
132     /*
133      * (non-Javadoc)
134      *
135      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
136      */
137     @Override
138     public String getName() {
139         return name;
140     }
141
142     /**
143      * Get the number of events received to date.
144      *
145      * @return the number of events received
146      */
147     public int getEventsReceived() {
148         return eventsReceived;
149     }
150
151     /*
152      * (non-Javadoc)
153      *
154      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
155      * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
156      */
157     @Override
158     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
159         return peerReferenceMap.get(peeredMode);
160     }
161
162     /*
163      * (non-Javadoc)
164      *
165      * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
166      * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
167      * org.onap.policy.apex.service.engine.event.PeeredReference)
168      */
169     @Override
170     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
171         peerReferenceMap.put(peeredMode, peeredReference);
172     }
173
174     /*
175      * (non-Javadoc)
176      *
177      * @see java.lang.Runnable#run()
178      */
179     @Override
180     public void run() {
181         // The endless loop that receives events using REST calls
182         while (consumerThread.isAlive() && !stopOrderedFlag) {
183             try {
184                 // Take the next event from the queue
185                 final Object eventObject =
186                         incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
187                 if (eventObject == null) {
188                     // Poll timed out, wait again
189                     continue;
190                 }
191
192                 // Send the event into Apex
193                 eventReceiver.receiveEvent(null, eventObject);
194
195                 eventsReceived++;
196             } catch (final InterruptedException e) {
197                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
198                 Thread.currentThread().interrupt();
199             } catch (final Exception e) {
200                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
201             }
202         }
203     }
204
205     /*
206      * (non-Javadoc)
207      *
208      * @see org.onap.policy.apex.apps.uservice.producer.ApexEventConsumer#stop()
209      */
210     @Override
211     public void stop() {
212         stopOrderedFlag = true;
213
214         while (consumerThread.isAlive()) {
215             ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME);
216         }
217     }
218 }