1194a2ce8a22bb2f4f3c364f34d647a4c89a6a58
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
23
24 import java.util.EnumMap;
25 import java.util.Map;
26 import java.util.Properties;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.TimeUnit;
30 import lombok.Getter;
31 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
32 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
33 import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
34 import org.onap.policy.apex.service.engine.event.ApexEventException;
35 import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
36 import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
37 import org.onap.policy.apex.service.engine.event.PeeredReference;
38 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
39 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * This class implements an Apex event consumer that receives events from its peered event requestor
45  * producer.
46  *
47  * @author Liam Fallon (liam.fallon@ericsson.com)
48  */
49 public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
50     // Get a reference to the logger
51     private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class);
52
53     // The amount of time to wait in milliseconds between checks that the consumer thread has
54     // stopped
55     private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50;
56
57     // The event receiver that will receive events from this consumer
58     private ApexEventReceiver eventReceiver;
59
60     // The name for this consumer
61     @Getter
62     private String name = null;
63
64     // The peer references for this event handler
65     private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
66             new EnumMap<>(EventHandlerPeeredMode.class);
67
68     // Temporary request holder for incoming event send requests
69     private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>();
70
71     // The consumer thread and stopping flag
72     private Thread consumerThread;
73     private boolean stopOrderedFlag = false;
74
75     // The number of events received to date
76     @Getter
77     private int eventsReceived = 0;
78
79     @Override
80     public void init(final String consumerName, final EventHandlerParameters consumerParameters,
81             final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
82         this.eventReceiver = incomingEventReceiver;
83         this.name = consumerName;
84
85         // Check and get the event requestor consumer properties
86         if (!(consumerParameters
87                 .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
88             final String errorMessage =
89                     "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")";
90             LOGGER.warn(errorMessage);
91             throw new ApexEventException(errorMessage);
92         }
93
94         // Check if we are in peered mode
95         if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
96             final String errorMessage = "event Requestor consumer (" + this.name
97                     + ") must run in peered requestor mode with a event Requestor producer";
98             LOGGER.warn(errorMessage);
99             throw new ApexEventException(errorMessage);
100         }
101
102     }
103
104     /**
105      * Receive an incoming event send request from the peered event Requestor producer and queue it.
106      *
107      * @param eventObject the incoming event to process
108      * @throws ApexEventRuntimeException on queueing errors
109      */
110     public void processEvent(final Object eventObject) {
111         // Push the event onto the queue for handling
112         try {
113             incomingEventRequestQueue.add(eventObject);
114         } catch (final Exception e) {
115             final String errorMessage =
116                     "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")";
117             LOGGER.warn(errorMessage, e);
118             throw new ApexEventRuntimeException(errorMessage);
119         }
120     }
121
122     /**
123      * {@inheritDoc}.
124      */
125     @Override
126     public void start() {
127         // Configure and start the event reception thread
128         final String threadName = this.getClass().getName() + ":" + this.name;
129         consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
130         consumerThread.setDaemon(true);
131         consumerThread.start();
132     }
133
134     /**
135      * {@inheritDoc}.
136      */
137     @Override
138     public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
139         return peerReferenceMap.get(peeredMode);
140     }
141
142     /**
143      * {@inheritDoc}.
144      */
145     @Override
146     public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
147         peerReferenceMap.put(peeredMode, peeredReference);
148     }
149
150     /**
151      * {@inheritDoc}.
152      */
153     @Override
154     public void run() {
155         // The endless loop that receives events using REST calls
156         while (consumerThread.isAlive() && !stopOrderedFlag) {
157             try {
158                 // Take the next event from the queue
159                 final Object eventObject =
160                         incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
161                 if (eventObject == null) {
162                     // Poll timed out, wait again
163                     continue;
164                 }
165
166                 // Send the event into Apex
167                 eventReceiver.receiveEvent(new Properties(), eventObject);
168
169                 eventsReceived++;
170             } catch (final InterruptedException e) {
171                 LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
172                 Thread.currentThread().interrupt();
173             } catch (final Exception e) {
174                 LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
175             }
176         }
177     }
178
179     /**
180      * {@inheritDoc}.
181      */
182     @Override
183     public void stop() {
184         stopOrderedFlag = true;
185
186         while (consumerThread.isAlive()) {
187             ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME);
188         }
189     }
190 }