2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.spike.service;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import java.util.ArrayList;
26 import java.util.TimerTask;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import javax.naming.OperationNotSupportedException;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.event.api.EventConsumer;
34 import org.onap.aai.event.api.EventPublisher;
35 import org.onap.aai.event.api.MessageWithOffset;
36 import org.onap.aai.spike.event.envelope.EventEnvelope;
37 import org.onap.aai.spike.event.envelope.EventEnvelopeParser;
38 import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
39 import org.onap.aai.spike.event.incoming.OffsetManager;
40 import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
41 import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
42 import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
43 import org.onap.aai.spike.logging.SpikeMsgs;
44 import org.onap.aai.spike.util.SpikeConstants;
45 import org.onap.aai.spike.util.SpikeProperties;
47 public class SpikeEventProcessor extends TimerTask {
50 * Client used for consuming events to the event bus.
52 private EventConsumer consumer;
54 * Client used for publishing events to the event bus.
56 private EventPublisher publisher;
58 * Internal queue where outgoing events will be buffered until they can be serviced by the event
59 * publisher worker threads.
61 private BlockingQueue<SpikeGraphEvent> eventQueue;
63 private Integer eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY;
64 private Integer eventOffsetPeriod = DEFAULT_EVENT_OFFSET_COMMIT_PERIOD;
66 private OffsetManager offsetManager;
67 private Long lastCommittedOffset = null;
68 private EventEnvelopeParser eventEnvelopeParser;
71 * Number of events that can be queued up for publishing before it is dropped
73 private static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
74 private static final Integer DEFAULT_EVENT_OFFSET_COMMIT_PERIOD = 10000;
76 private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeEventProcessor.class.getName());
77 private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(SpikeEventProcessor.class.getName());
78 private static final Gson gson =
79 new GsonBuilder().setExclusionStrategies(new SpikeEventExclusionStrategy()).setPrettyPrinting().create();
81 public SpikeEventProcessor(EventConsumer consumer, EventPublisher publisher) {
82 this.consumer = consumer;
83 this.publisher = publisher;
86 eventQueueCapacity = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_CAPACITY));
87 eventOffsetPeriod = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_OFFSET_CHECK_PERIOD));
89 } catch (Exception ex) {
92 eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator());
93 new Thread(new SpikeEventPublisher()).start();
95 // Instantiate the offset manager. This will run a background thread that
96 // periodically updates the value of the most recent offset value that can
97 // be safely committed with the event bus.
98 offsetManager = new OffsetManager(eventQueueCapacity, eventOffsetPeriod);
99 eventEnvelopeParser = new EventEnvelopeParser();
104 logger.info(SpikeMsgs.SPIKE_QUERY_EVENT_SYSTEM);
106 if (consumer == null) {
107 logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
111 Iterable<MessageWithOffset> events = null;
113 events = consumer.consumeWithOffsets();
115 } catch (OperationNotSupportedException e) {
116 // This means we are using DMaaP and can't use offsets
118 Iterable<String> tempEvents = consumer.consume();
119 ArrayList<MessageWithOffset> messages = new ArrayList<MessageWithOffset>();
120 for (String event : tempEvents) {
121 messages.add(new MessageWithOffset(0, event));
124 } catch (Exception e1) {
125 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e1.getMessage());
128 } catch (Exception e) {
129 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
133 if (events == null || !events.iterator().hasNext()) {
134 logger.info(SpikeMsgs.SPIKE_NO_EVENT_RECEIVED);
137 for (MessageWithOffset event : events) {
139 logger.debug(SpikeMsgs.SPIKE_EVENT_RECEIVED, event.getMessage());
141 GizmoGraphEvent modelEvent = eventEnvelopeParser.parseEvent(event.getMessage());
142 auditLogger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED,
143 "of type: " + modelEvent.getObjectType() + " with key: " + modelEvent.getObjectKey()
144 + " , transaction-id: " + modelEvent.getTransactionId());
145 logger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, "of type: " + modelEvent.getObjectType() + " with key: "
146 + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
148 String modelEventJson = gson.toJson(modelEvent);
150 // Log the current event as 'being processed' with the offset manager so that we know that it's
151 // associated offset is not yet save to be committed as 'done'.
152 offsetManager.cacheEvent(modelEvent.getTransactionId(), event.getOffset());
154 while (eventQueue.size() >= eventQueueCapacity) {
155 // Wait until there's room in the queue
156 logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE,
157 "Event could not be published to the event bus due to: Internal buffer capacity exceeded. Waiting 10 seconds.");
161 eventQueue.offer(modelEvent.toSpikeGraphEvent());
163 logger.info(SpikeMsgs.SPIKE_EVENT_PROCESSED, "of type: " + modelEvent.getObjectType() + " with key: "
164 + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
165 logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
167 } catch (InterruptedException e) {
168 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
169 e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
170 // Restore the interrupted status...
171 Thread.currentThread().interrupt();
172 } catch (Exception e) {
173 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
174 e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
180 // Get the next 'safe' offset to be committed from the offset manager.
181 // We need to do this here istead of letting the offset manager just take care
182 // of it for us because the event consumer is not thread safe. If we try to
183 // commit the offsets from another thread, it gets unhappy...
184 Long nextOffset = offsetManager.getNextOffsetToCommit();
186 // Make sure we actually have a real value...
187 if (nextOffset != null) {
189 // There is no point in continually committing the same offset value, so make sure
190 // that something has actually changed before we do anything...
191 if ((lastCommittedOffset == null) || (!lastCommittedOffset.equals(nextOffset))) {
193 if (logger.isDebugEnabled()) {
195 "Committing offset: " + nextOffset + " to the event bus for Champ raw event topic.");
198 // OK, let's commit the latest value...
199 consumer.commitOffsets(nextOffset);
200 lastCommittedOffset = nextOffset;
203 } catch (OperationNotSupportedException e) {
204 // We must be working with a DMaap which doesn't support offset management. Swallow
206 } catch (Exception e) {
207 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
212 * This class implements the threads which is responsible for buffering the events in memory and
213 * ordering them before publishing it to topic
215 * Each publish operation is performed synchronously, so that the thread will only move on to the
216 * next available event once it has actually published the current event to the bus.
218 private class SpikeEventPublisher implements Runnable {
221 * Partition key to use when publishing events to the event stream. We WANT all events to go to a
222 * single partition, so we are just using a hard-coded key for every event.
224 private static final String EVENTS_PARTITION_KEY = "SpikeEventKey";
225 private static final int DEFAULT_EVENT_QUEUE_DELAY = 10000;
227 Integer eventQueueDelay = DEFAULT_EVENT_QUEUE_DELAY;
229 public SpikeEventPublisher() {
231 eventQueueDelay = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_DELAY));
232 } catch (Exception ex) {
240 SpikeGraphEvent nextEvent;
241 SpikeGraphEvent event = null;
244 // Get the next event to be published from the queue if it is old enough or we have too
245 // many items in the queue
246 if ((nextEvent = eventQueue.peek()) != null
247 && (System.currentTimeMillis() - nextEvent.getSpikeTimestamp() > eventQueueDelay
248 || eventQueue.size() > eventQueueCapacity)) {
249 event = eventQueue.take();
251 // Small pause so that we aren't burning CPU
256 } catch (InterruptedException e) {
257 // Restore the interrupted status.
258 Thread.currentThread().interrupt();
262 // Try publishing the event to the event bus. This call will block
263 // until the event is published or times out.
265 String eventJson = gson.toJson(new EventEnvelope(event));
266 int sentMessageCount = publisher.sendSync(EVENTS_PARTITION_KEY, eventJson);
267 if (sentMessageCount > 0) {
268 logger.info(SpikeMsgs.SPIKE_EVENT_PUBLISHED, "of type: " + event.getObjectType() + " with key: "
269 + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
270 logger.debug(SpikeMsgs.SPIKE_EVENT_PUBLISHED, eventJson);
272 logger.warn(SpikeMsgs.SPIKE_PUBLISH_FAILED, "of type: " + event.getObjectType() + " with key: "
273 + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
274 logger.debug(SpikeMsgs.SPIKE_PUBLISH_FAILED, eventJson);
278 // Inform the offset manager that this event has been published. It's offset
279 // can now, potentially, be safely committed to the event bus so that on a
280 // restart we won't reprocess it.
281 offsetManager.markAsPublished(event.getTransactionId());
283 } catch (ExecutionException e) {
285 // Publish timed out, queue it up to retry again. Since this message was pulled from the
286 // top of the queue, it will go back to the top.
287 logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, "Retrying in 60 seconds. " + e.getMessage());
288 eventQueue.offer(event);
292 } catch (InterruptedException e1) {
293 Thread.currentThread().interrupt();
295 } catch (Exception e) {
296 logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());