Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / event / impl / eventrequestor / EventRequestorConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
22
23 import java.util.EnumMap;
24 import java.util.Map;
25 import java.util.Properties;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
30 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
31 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
32 import org.onap.policy.apex.service.engine.event.ApexEventException;
33 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
34 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
35 import org.onap.policy.apex.service.engine.event.PeeredReference;
36 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
37 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * This class implements an Apex event consumer that receives events from its peered event requestor
43  * producer.
44  *
45  * @author Liam Fallon (liam.fallon@ericsson.com)
46  */
47 public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
48     // Get a reference to the logger
49     private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class);
50
51     // The amount of time to wait in milliseconds between checks that the consumer thread has
52     // stopped
53     private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50;
54
55     // The event receiver that will receive events from this consumer
56     private ApexEventReceiver eventReceiver;
57
58     // The name for this consumer
59     private String name = null;
60
61     // The peer references for this event handler
62     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
63             new EnumMap<>(EventHandlerPeeredMode.class);
64
65     // Temporary request holder for incoming event send requests
66     private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>();
67
68     // The consumer thread and stopping flag
69     private Thread consumerThread;
70     private boolean stopOrderedFlag = false;
71
72     // The number of events received to date
73     private int eventsReceived = 0;
74
75     @Override
76     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
77             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
78         this.eventReceiver = incomingEventReceiver;
79         this.name = consumerName;
80
81         // Check and get the event requestor consumer properties
82         if (!(consumerParameters
83                 .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
84             final String errorMessage =
85                     "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")";
86             LOGGER.warn(errorMessage);
87             throw new ApexEventException(errorMessage);
88         }
89
90         // Check if we are in peered mode
91         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
92             final String errorMessage = "event Requestor consumer (" + this.name
93                     + ") must run in peered requestor mode with a event Requestor producer";
94             LOGGER.warn(errorMessage);
95             throw new ApexEventException(errorMessage);
96         }
97
98     }
99
100     /**
101      * Receive an incoming event send request from the peered event Requestor producer and queue it.
102      *
103      * @param eventObject the incoming event to process
104      * @throws ApexEventRuntimeException on queueing errors
105      */
106     public void processEvent(final Object eventObject) {
107         // Push the event onto the queue for handling
108         try {
109             incomingEventRequestQueue.add(eventObject);
110         } catch (final Exception e) {
111             final String errorMessage =
112                     "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")";
113             LOGGER.warn(errorMessage, e);
114             throw new ApexEventRuntimeException(errorMessage);
115         }
116     }
117
118     /**
119      * {@inheritDoc}.
120      */
121     @Override
122     public void start() {
123         // Configure and start the event reception thread
124         final String threadName = this.getClass().getName() + ":" + this.name;
125         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
126         consumerThread.setDaemon(true);
127         consumerThread.start();
128     }
129
130     /**
131      * {@inheritDoc}.
132      */
133     @Override
134     public String getName() {
135         return name;
136     }
137
138     /**
139      * Get the number of events received to date.
140      *
141      * @return the number of events received
142      */
143     public int getEventsReceived() {
144         return eventsReceived;
145     }
146
147     /**
148      * {@inheritDoc}.
149      */
150     @Override
151     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
152         return peerReferenceMap.get(peeredMode);
153     }
154
155     /**
156      * {@inheritDoc}.
157      */
158     @Override
159     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
160         peerReferenceMap.put(peeredMode, peeredReference);
161     }
162
163     /**
164      * {@inheritDoc}.
165      */
166     @Override
167     public void run() {
168         // The endless loop that receives events using REST calls
169         while (consumerThread.isAlive() && !stopOrderedFlag) {
170             try {
171                 // Take the next event from the queue
172                 final Object eventObject =
173                         incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
174                 if (eventObject == null) {
175                     // Poll timed out, wait again
176                     continue;
177                 }
178
179                 // Send the event into Apex
180                 eventReceiver.receiveEvent(new Properties(), eventObject);
181
182                 eventsReceived++;
183             } catch (final InterruptedException e) {
184                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
185                 Thread.currentThread().interrupt();
186             } catch (final Exception e) {
187                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
188             }
189         }
190     }
191
192     /**
193      * {@inheritDoc}.
194      */
195     @Override
196     public void stop() {
197         stopOrderedFlag = true;
198
199         while (consumerThread.isAlive()) {
200             ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME);
201         }
202     }
203 }