Fix Sonar bugs and possible NPE in code
[aai/spike.git] / src / main / java / org / onap / aai / spike / service / SpikeEventProcessor.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.spike.service;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import java.util.ArrayList;
26 import java.util.TimerTask;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.PriorityBlockingQueue;
30 import javax.naming.OperationNotSupportedException;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.event.api.EventConsumer;
34 import org.onap.aai.event.api.EventPublisher;
35 import org.onap.aai.event.api.MessageWithOffset;
36 import org.onap.aai.spike.event.envelope.EventEnvelope;
37 import org.onap.aai.spike.event.envelope.EventEnvelopeParser;
38 import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
39 import org.onap.aai.spike.event.incoming.OffsetManager;
40 import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
41 import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
42 import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
43 import org.onap.aai.spike.logging.SpikeMsgs;
44 import org.onap.aai.spike.util.SpikeConstants;
45 import org.onap.aai.spike.util.SpikeProperties;
46
47 public class SpikeEventProcessor extends TimerTask {
48
49     /**
50      * Client used for consuming events to the event bus.
51      */
52     private EventConsumer consumer;
53     /**
54      * Client used for publishing events to the event bus.
55      */
56     private EventPublisher publisher;
57     /**
58      * Internal queue where outgoing events will be buffered until they can be serviced by the event
59      * publisher worker threads.
60      */
61     private BlockingQueue<SpikeGraphEvent> eventQueue;
62
63     private Integer eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY;
64     private Integer eventOffsetPeriod = DEFAULT_EVENT_OFFSET_COMMIT_PERIOD;
65
66     private OffsetManager offsetManager;
67     private Long lastCommittedOffset = null;
68     private EventEnvelopeParser eventEnvelopeParser;
69
70     /**
71      * Number of events that can be queued up for publishing before it is dropped
72      */
73     private static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
74     private static final Integer DEFAULT_EVENT_OFFSET_COMMIT_PERIOD = 10000;
75
76     private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeEventProcessor.class.getName());
77     private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(SpikeEventProcessor.class.getName());
78     private static final Gson gson =
79             new GsonBuilder().setExclusionStrategies(new SpikeEventExclusionStrategy()).setPrettyPrinting().create();
80
81     public SpikeEventProcessor(EventConsumer consumer, EventPublisher publisher) {
82         this.consumer = consumer;
83         this.publisher = publisher;
84
85         try {
86             eventQueueCapacity = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_CAPACITY));
87             eventOffsetPeriod = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_OFFSET_CHECK_PERIOD));
88
89         } catch (Exception ex) {
90         }
91
92         eventQueue = new PriorityBlockingQueue<>(eventQueueCapacity, new SpikeEventComparator());
93         new Thread(new SpikeEventPublisher()).start();
94
95         // Instantiate the offset manager. This will run a background thread that
96         // periodically updates the value of the most recent offset value that can
97         // be safely committed with the event bus.
98         offsetManager = new OffsetManager(eventQueueCapacity, eventOffsetPeriod);
99         eventEnvelopeParser = new EventEnvelopeParser();
100     }
101
102     @Override
103     public void run() {
104         logger.info(SpikeMsgs.SPIKE_QUERY_EVENT_SYSTEM);
105
106         if (consumer == null) {
107             logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
108             return;
109         }
110
111         Iterable<MessageWithOffset> events = null;
112         try {
113             events = consumer.consumeWithOffsets();
114
115         } catch (OperationNotSupportedException e) {
116             // This means we are using DMaaP and can't use offsets
117             try {
118                 Iterable<String> tempEvents = consumer.consume();
119                 ArrayList<MessageWithOffset> messages = new ArrayList<MessageWithOffset>();
120                 for (String event : tempEvents) {
121                     messages.add(new MessageWithOffset(0, event));
122                 }
123                 events = messages;
124             } catch (Exception e1) {
125                 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e1.getMessage());
126                 return;
127             }
128         } catch (Exception e) {
129             logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
130             return;
131         }
132
133         if (events == null || !events.iterator().hasNext()) {
134             logger.info(SpikeMsgs.SPIKE_NO_EVENT_RECEIVED);
135         }
136
137         for (MessageWithOffset event : events) {
138             try {
139                 logger.debug(SpikeMsgs.SPIKE_EVENT_RECEIVED, event.getMessage());
140
141                 GizmoGraphEvent modelEvent = eventEnvelopeParser.parseEvent(event.getMessage());
142                 auditLogger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED,
143                         "of type: " + modelEvent.getObjectType() + " with key: " + modelEvent.getObjectKey()
144                                 + " , transaction-id: " + modelEvent.getTransactionId());
145                 logger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, "of type: " + modelEvent.getObjectType() + " with key: "
146                         + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
147
148                 String modelEventJson = gson.toJson(modelEvent);
149
150                 // Log the current event as 'being processed' with the offset manager so that we know that it's
151                 // associated offset is not yet save to be committed as 'done'.
152                 offsetManager.cacheEvent(modelEvent.getTransactionId(), event.getOffset());
153
154                 while (eventQueue.size() >= eventQueueCapacity) {
155                     // Wait until there's room in the queue
156                     logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE,
157                             "Event could not be published to the event bus due to: Internal buffer capacity exceeded. Waiting 10 seconds.");
158                     Thread.sleep(10000);
159                 }
160
161                 eventQueue.offer(modelEvent.toSpikeGraphEvent());
162
163                 logger.info(SpikeMsgs.SPIKE_EVENT_PROCESSED, "of type: " + modelEvent.getObjectType() + " with key: "
164                         + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
165                 logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
166
167             } catch (InterruptedException e) {
168                 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
169                         e.getMessage() + ".  Incoming event payload:\n" + event.getMessage());
170                 // Restore the interrupted status...
171                 Thread.currentThread().interrupt();
172             } catch (Exception e) {
173                 logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
174                         e.getMessage() + ".  Incoming event payload:\n" + event.getMessage());
175             }
176         }
177
178         try {
179
180             // Get the next 'safe' offset to be committed from the offset manager.
181             // We need to do this here istead of letting the offset manager just take care
182             // of it for us because the event consumer is not thread safe. If we try to
183             // commit the offsets from another thread, it gets unhappy...
184             Long nextOffset = offsetManager.getNextOffsetToCommit();
185
186             // Make sure we actually have a real value...
187             if (nextOffset != null) {
188
189                 // There is no point in continually committing the same offset value, so make sure
190                 // that something has actually changed before we do anything...
191                 if ((lastCommittedOffset == null) || (!lastCommittedOffset.equals(nextOffset))) {
192
193                     if (logger.isDebugEnabled()) {
194                         logger.debug(
195                                 "Committing offset: " + nextOffset + " to the event bus for Champ raw event topic.");
196                     }
197
198                     // OK, let's commit the latest value...
199                     consumer.commitOffsets(nextOffset);
200                     lastCommittedOffset = nextOffset;
201                 }
202             }
203         } catch (OperationNotSupportedException e) {
204             // We must be working with a DMaap which doesn't support offset management. Swallow
205             // the exception
206         } catch (Exception e) {
207             logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
208         }
209     }
210
211     /**
212      * This class implements the threads which is responsible for buffering the events in memory and
213      * ordering them before publishing it to topic
214      * <p>
215      * Each publish operation is performed synchronously, so that the thread will only move on to the
216      * next available event once it has actually published the current event to the bus.
217      */
218     private class SpikeEventPublisher implements Runnable {
219
220         /**
221          * Partition key to use when publishing events to the event stream. We WANT all events to go to a
222          * single partition, so we are just using a hard-coded key for every event.
223          */
224         private static final String EVENTS_PARTITION_KEY = "SpikeEventKey";
225         private static final int DEFAULT_EVENT_QUEUE_DELAY = 10000;
226
227         Integer eventQueueDelay = DEFAULT_EVENT_QUEUE_DELAY;
228
229         public SpikeEventPublisher() {
230             try {
231                 eventQueueDelay = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_DELAY));
232             } catch (Exception ex) {
233             }
234         }
235
236         @Override
237         public void run() {
238             while (true) {
239
240                 SpikeGraphEvent nextEvent;
241                 SpikeGraphEvent event = null;
242                 try {
243
244                     // Get the next event to be published from the queue if it is old enough or we have too
245                     // many items in the queue
246                     if ((nextEvent = eventQueue.peek()) != null
247                             && (System.currentTimeMillis() - nextEvent.getSpikeTimestamp() > eventQueueDelay
248                                     || eventQueue.size() > eventQueueCapacity)) {
249                         event = eventQueue.take();
250                     } else {
251                         // Small pause so that we aren't burning CPU
252                         Thread.sleep(200);
253                         continue;
254                     }
255
256                 } catch (InterruptedException e) {
257                     // Restore the interrupted status.
258                     Thread.currentThread().interrupt();
259                     continue;
260                 }
261
262                 // Try publishing the event to the event bus. This call will block
263                 // until the event is published or times out.
264                 try {
265                     String eventJson = gson.toJson(new EventEnvelope(event));
266                     int sentMessageCount = publisher.sendSync(EVENTS_PARTITION_KEY, eventJson);
267                     if (sentMessageCount > 0) {
268                         logger.info(SpikeMsgs.SPIKE_EVENT_PUBLISHED, "of type: " + event.getObjectType() + " with key: "
269                                 + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
270                         logger.debug(SpikeMsgs.SPIKE_EVENT_PUBLISHED, eventJson);
271                     } else {
272                         logger.warn(SpikeMsgs.SPIKE_PUBLISH_FAILED, "of type: " + event.getObjectType() + " with key: "
273                                 + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
274                         logger.debug(SpikeMsgs.SPIKE_PUBLISH_FAILED, eventJson);
275                     }
276
277
278                     // Inform the offset manager that this event has been published. It's offset
279                     // can now, potentially, be safely committed to the event bus so that on a
280                     // restart we won't reprocess it.
281                     offsetManager.markAsPublished(event.getTransactionId());
282
283                 } catch (ExecutionException e) {
284
285                     // Publish timed out, queue it up to retry again. Since this message was pulled from the
286                     // top of the queue, it will go back to the top.
287                     logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, "Retrying in 60 seconds. " + e.getMessage());
288                     eventQueue.offer(event);
289
290                     try {
291                         Thread.sleep(60000);
292                     } catch (InterruptedException e1) {
293                         Thread.currentThread().interrupt();
294                     }
295                 } catch (Exception e) {
296                     logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());
297                 }
298             }
299         }
300     }
301
302 }