Merge "Updated champ-lib to use the correct logger"
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampCoreMsgs;
38 import org.onap.aai.champcore.ChampGraph;
39 import org.onap.aai.champcore.ChampTransaction;
40 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
41 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
42 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
44 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
46 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
47 import org.onap.aai.champcore.exceptions.ChampTransactionException;
48 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
49 import org.onap.aai.champcore.model.ChampObject;
50 import org.onap.aai.champcore.model.ChampObjectConstraint;
51 import org.onap.aai.champcore.model.ChampObjectIndex;
52 import org.onap.aai.champcore.model.ChampPartition;
53 import org.onap.aai.champcore.model.ChampRelationship;
54 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
55 import org.onap.aai.champcore.model.ChampRelationshipIndex;
56 import org.onap.aai.champcore.model.ChampSchema;
57 import org.onap.aai.cl.api.Logger;
58 import org.onap.aai.cl.eelf.LoggerFactory;
59 import org.onap.aai.event.api.EventPublisher;
60
61
62
63
64 /**
65  * This class provides the hooks to allow Champ operations to be logged to an event
66  * stream.
67  */
68 public abstract class AbstractLoggingChampGraph implements ChampGraph {
69
70   public static final String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
71   public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
72   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
73   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
74   public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
75   protected static final String KEY_PROPERTY_NAME = "aai-uuid";
76   protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
77
78   /** Pool of worker threads that do the work of publishing the events to the event bus. */
79   protected ThreadPoolExecutor publisherPool;
80
81   /** Client used for publishing events to the event bus. */
82   protected EventPublisher producer;
83
84   /** Internal queue where outgoing events will be buffered until they can be serviced by
85    *  the event publisher worker threads. */
86   protected BlockingQueue<ChampEvent> eventQueue;
87
88   /** Number of events that can be queued up for publication before we begin dropping
89    *  events. */
90   private Integer eventQueueCapacity;
91
92   /** Number of event publisher worker threads. */
93   private Integer eventStreamPublisherPoolSize;
94
95   private static final Logger logger = LoggerFactory.getInstance().getLogger(AbstractLoggingChampGraph.class);
96
97
98   /**
99    * Create a new instance of the AbstractLoggingChampGraph.
100    *
101    * @param properties - Set of configuration properties for this graph instance.
102    */
103   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
104
105     // Extract the necessary parameters from the configuration properties.
106     configure(properties);
107
108     // Make sure we were passed an event producer as one of our properties, otherwise
109     // there is really nothing more we can do...
110     if(producer == null) {
111       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
112           "No event stream producer was supplied.");
113       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
114           "NOTE!! Champ events will NOT be published to the event stream!");
115       return;
116     }
117
118     // Create the blocking queue that we will use to buffer events that we want
119     // published to the event bus.
120     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
121
122     // Create the executor pool that will do the work of publishing events to the event bus.
123     publisherPool =
124         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
125             new ProducerWorkerThreadFactory());
126
127     try {
128
129       // Start up the producer worker threads.
130       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
131         publisherPool.submit(new EventPublisherWorker());
132       }
133
134     } catch (Exception e) {
135
136       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
137           "Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
138       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
139           "NOTE!! Champ events may NOT be published to the event stream!");
140       return;
141     }
142   }
143
144
145
146   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
147   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
148   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
149   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
150   @Override
151   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
152   @Override
153   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
154   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
155   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
156   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
157
158   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
159
160   /**
161     * Creates or updates a vertex in the graph data store.
162     * <p>
163     * If a transaction context is not provided, then a transaction will be automatically
164     * created and committed for this operation only, otherwise, the supplied transaction
165     * will be used and it will be up to the caller to commit the transaction at its
166     * discretion.
167     *
168     * @param object      - The vertex to be created or updated.
169     * @param transaction - Optional transaction context to perform the operation in.
170     *
171     * @return - The vertex, as created, marshaled as a {@link ChampObject}
172     *
173     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
174     *                                         into the backend representation
175     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
176     *                                         by {@link ChampGraph#retrieveSchema}
177     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
178     *                                         is not present or object not found in the graph
179     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
180     */
181   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
182
183   /**
184    * Updates an existing vertex in the graph store.
185    * <p>
186    * If a transaction context is not provided, then a transaction will be automatically
187    * created and committed for this operation only, otherwise, the supplied transaction
188    * will be used and it will be up to the caller to commit the transaction at its
189    * discretion.
190    *
191    * @param object      - The vertex to be created or updated.
192    * @param transaction - Optional transaction context to perform the operation in.
193    *
194    * @return - The updated vertex, marshaled as a {@link ChampObject}
195    *
196    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
197    *                                         the backend representation
198    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
199    *                                         by {@link ChampGraph#retrieveSchema}
200    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
201    *                                         is not present or object not found in the graph
202    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
203    */
204   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
205
206   /**
207    * Deletes an existing vertex from the graph store.
208    * <p>
209    * If a transaction context is not provided, then a transaction will be automatically
210    * created and committed for this operation only, otherwise, the supplied transaction
211    * will be used and it will be up to the caller to commit the transaction at its
212    * discretion.
213    *
214    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
215    * @param transaction - Optional transaction context to perform the operation in.
216    *
217    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
218    *                                         is not present or object not found in the graph
219    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
220    */
221   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
222
223   /**
224    * Creates or updates an edge in the graph data store.
225    * <p>
226    * If a transaction context is not provided, then a transaction will be automatically
227    * created and committed for this operation only, otherwise, the supplied transaction
228    * will be used and it will be up to the caller to commit the transaction at its
229    * discretion.
230    *
231    * @param relationship - The ChampRelationship that you wish to store in the graph
232    * @param transaction  - Optional transaction context to perform the operation in.
233    *
234    * @return - The {@link ChampRelationship} as it was stored.
235    *
236    * @throws ChampUnmarshallingException         - If the edge which was created could not be
237    *                                               unmarshalled into a ChampRelationship
238    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
239    *                                               marshalled into the backend representation
240    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
241    *                                               by this relationship does not exist in the graph
242    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
243    *                                               specifed by {@link ChampGraph#retrieveSchema}
244    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
245    *                                               but the object cannot be found in the graph
246    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
247    */
248   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
249
250   /**
251    * Replaces an existing edge in the graph data store.
252    * <p>
253    * If a transaction context is not provided, then a transaction will be automatically
254    * created and committed for this operation only, otherwise, the supplied transaction
255    * will be used and it will be up to the caller to commit the transaction at its
256    * discretion.
257    *
258    * @param relationship  - The ChampRelationship that you wish to replace in the graph
259    * @param transaction   - Optional transaction context to perform the operation in.
260    *
261    * @return - The {@link ChampRelationship} as it was stored.
262    *
263    * @throws ChampUnmarshallingException         - If the edge which was created could not be
264    *                                               unmarshalled into a ChampRelationship
265    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
266    *                                               marshalled into the backend representation
267    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
268    *                                               specifed by {@link ChampGraph#retrieveSchema}
269    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
270    *                                               but the object cannot be found in the graph
271    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
272    */
273   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
274
275   /**
276    * Removes an edge from the graph data store.
277    * <p>
278    * If a transaction context is not provided, then a transaction will be automatically
279    * created and committed for this operation only, otherwise, the supplied transaction
280    * will be used and it will be up to the caller to commit the transaction at its
281    * discretion.
282    *
283    * @param relationship - The ChampRelationship that you wish to remove from the graph.
284    * @param transaction  - Optional transaction context to perform the operation in.
285    *
286    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
287    *                                               but the object cannot be found in the graph
288    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
289    */
290   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
291
292   /**
293    * Create or update a {@link ChampPartition}.
294    * <p>
295    * If a transaction context is not provided, then a transaction will be automatically
296    * created and committed for this operation only, otherwise, the supplied transaction
297    * will be used and it will be up to the caller to commit the transaction at its
298    * discretion.
299    *
300    * @param partition   - The ChampPartition that you wish to create or update in the graph.
301    * @param transaction - Optional transaction context to perform the operation in.
302    *
303    * @return - The {@link ChampPartition} as it was stored.
304    *
305    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
306    *                                               specifed by {@link ChampGraph#retrieveSchema}
307    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
308    *                                               but the object cannot be found in the graph
309    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
310    *                                               marshalled into the backend representation
311    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
312    *                                               by this relationship does not exist in the graph
313    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
314    */
315   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
316
317   /**
318    * Removes a partition from the graph.
319    * <p>
320    * If a transaction context is not provided, then a transaction will be automatically
321    * created and committed for this operation only, otherwise, the supplied transaction
322    * will be used and it will be up to the caller to commit the transaction at its
323    * discretion.
324    *
325    * @param graph       - The partition to be removed.
326    * @param transaction - Optional transaction context to perform the operation in.
327    *
328    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
329    */
330   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
331
332   /**
333    * Create or update an object index in the graph.
334    *
335    * @param index       - The object index to be created/updated.
336    */
337   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
338   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
339   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
340   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
341   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
342   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
343   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
344   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
345   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
346   public abstract ChampSchema                      retrieveSchema();
347   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
348   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
349   public abstract void                             deleteSchema();
350   public abstract ChampCapabilities                capabilities();
351
352
353   /**
354    * Thread factory for the event producer workers.
355    */
356   private class ProducerWorkerThreadFactory implements ThreadFactory {
357
358     private AtomicInteger threadNumber = new AtomicInteger(1);
359
360     public Thread newThread(Runnable r) {
361       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
362     }
363   }
364
365
366   /**
367    * Process the configuration properties supplied for this graph instance.
368    *
369    * @param properties - Configuration parameters.
370    */
371   private void configure(Map<String, Object> properties) {
372
373     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
374
375     eventQueueCapacity =
376         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
377     eventStreamPublisherPoolSize =
378         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
379   }
380
381
382   public void setProducer(EventPublisher aProducer) {
383
384     producer = aProducer;
385   }
386
387   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
388
389     if(properties.containsKey(property)) {
390       return properties.get(property);
391     } else {
392       return defaultValue;
393     }
394   }
395
396   @Override
397   public void shutdown() {
398
399     if(publisherPool != null) {
400       publisherPool.shutdown();
401
402       try {
403         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
404       } catch (InterruptedException e) {
405         logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN, 
406             "Termination interrupted");
407         Thread.currentThread().interrupt();
408       }
409     }
410
411     if(producer != null) {
412
413       try {
414         producer.close();
415
416       } catch (Exception e) {
417         logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
418             "Failed to stop event stream producer: " + e.getMessage());
419       }
420     }
421   }
422
423   @Override
424   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
425
426     try {
427
428       // Commit the transaction.
429       transaction.commit();
430
431     } catch (ChampTransactionException e) {
432
433       logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN, 
434           "Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
435
436       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
437       for(ChampEvent event : enqueuedEvents) {
438
439         logger.debug("Graph event " + event.toString() + " not published.");
440       }
441       throw e;
442     }
443
444     // Now that the transaction has been successfully committed, we need
445     // to log the events that were produced within that transaction's
446     // context.
447     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
448     for(ChampEvent event : enqueuedEvents) {
449       logEvent(event);
450     }
451   }
452
453   @Override
454   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
455
456     // Rollback the transaction.
457     transaction.rollback();
458   }
459
460   @Override
461   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
462     return storeObject(object, Optional.empty());
463   }
464
465   @Override
466   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
467
468     ChampObject storedObject = executeStoreObject(object, transaction);
469
470     if(storedObject != null) {
471
472       logOrEnqueueEvent(ChampEvent.builder()
473                                     .operation(ChampOperation.STORE)
474                                     .entity(storedObject)
475                                     .build(),
476                         transaction);
477     }
478
479     return storedObject;
480   }
481
482   @Override
483   public ChampObject replaceObject(ChampObject object)
484       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
485
486     return replaceObject(object, Optional.empty());
487   }
488
489   @Override
490   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
491       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
492
493     ChampObject replacedObject = executeReplaceObject(object, transaction);
494
495     if(replacedObject != null) {
496
497       logOrEnqueueEvent(ChampEvent.builder()
498                                   .operation(ChampOperation.REPLACE)
499                                   .entity(replacedObject)
500                                   .build(),
501                         transaction);
502     }
503
504     return replacedObject;
505   }
506
507   @Override
508   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
509     deleteObject(key, Optional.empty());
510   }
511
512   @Override
513   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
514
515     // Retrieve the object that we are deleting before it's gone, so that we can
516     // report it to the event stream.
517     Optional<ChampObject> objectToDelete = Optional.empty();
518     try {
519       objectToDelete = retrieveObject(key, transaction);
520
521     } catch (ChampUnmarshallingException e) {
522       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
523           "Unable to generate delete object log: " + e.getMessage());
524     }
525
526     executeDeleteObject(key, transaction);
527
528     if(objectToDelete.isPresent()) {
529       // Update the event stream with the current operation.
530       logOrEnqueueEvent(ChampEvent.builder()
531                                   .operation(ChampOperation.DELETE)
532                                   .entity(objectToDelete.get())
533                                   .build(),
534                         transaction);
535     }
536   }
537
538   @Override
539   public ChampRelationship storeRelationship(ChampRelationship relationship)
540       throws ChampUnmarshallingException,
541              ChampMarshallingException,
542              ChampObjectNotExistsException,
543              ChampSchemaViolationException,
544              ChampRelationshipNotExistsException, ChampTransactionException {
545       return storeRelationship(relationship, Optional.empty());
546   }
547
548   @Override
549   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
550       throws ChampUnmarshallingException,
551              ChampMarshallingException,
552              ChampObjectNotExistsException,
553              ChampSchemaViolationException,
554              ChampRelationshipNotExistsException, ChampTransactionException {
555
556     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
557
558     if(storedRelationship != null) {
559
560       // Update the event stream with the current operation.
561       logOrEnqueueEvent(ChampEvent.builder()
562                                   .operation(ChampOperation.STORE)
563                                   .entity(storedRelationship)
564                                   .build(),
565                         transaction);
566     }
567
568     return storedRelationship;
569   }
570
571   @Override
572   public ChampRelationship replaceRelationship(ChampRelationship relationship)
573       throws ChampUnmarshallingException,
574              ChampMarshallingException,
575              ChampSchemaViolationException,
576              ChampRelationshipNotExistsException, ChampTransactionException {
577     return replaceRelationship(relationship, Optional.empty());
578   }
579
580   @Override
581   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
582       throws ChampUnmarshallingException,
583              ChampMarshallingException,
584              ChampSchemaViolationException,
585              ChampRelationshipNotExistsException, ChampTransactionException {
586
587     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
588
589     if(replacedRelationship != null) {
590
591       // Update the event stream with the current operation.
592       logOrEnqueueEvent(ChampEvent.builder()
593                                   .operation(ChampOperation.REPLACE)
594                                   .entity(replacedRelationship)
595                                   .build(),
596                         transaction);
597     }
598
599     return replacedRelationship;
600   }
601
602   @Override
603   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
604     deleteRelationship(relationship, Optional.empty());
605   }
606
607   @Override
608   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
609
610     executeDeleteRelationship(relationship, transaction);
611
612     // Update the event stream with the current operation.
613     logOrEnqueueEvent(ChampEvent.builder()
614                                 .operation(ChampOperation.DELETE)
615                                 .entity(relationship)
616                                 .build(),
617                       transaction);
618   }
619
620   @Override
621   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
622     return storePartition(partition, Optional.empty());
623   }
624
625   @Override
626   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
627
628     ChampPartition storedPartition = executeStorePartition(partition, transaction);
629
630     if(storedPartition != null) {
631
632       // Update the event stream with the current operation.
633       logOrEnqueueEvent(ChampEvent.builder()
634                                   .operation(ChampOperation.STORE)
635                                   .entity(storedPartition)
636                                   .build(),
637                         transaction);
638     }
639
640     return storedPartition;
641   }
642
643   @Override
644   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
645     deletePartition(graph, Optional.empty());
646   }
647
648   @Override
649   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
650
651     executeDeletePartition(graph, transaction);
652
653     // Update the event stream with the current operation.
654     logOrEnqueueEvent(ChampEvent.builder()
655                                 .operation(ChampOperation.DELETE)
656                                 .entity(graph)
657                                 .build(),
658                       transaction);
659   }
660
661   @Override
662   public void storeObjectIndex(ChampObjectIndex index) {
663
664     executeStoreObjectIndex(index);
665
666     // Update the event stream with the current operation.
667     logEvent(ChampEvent.builder()
668                   .operation(ChampOperation.STORE)
669                   .entity(index)
670                   .build());
671   }
672
673
674   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
675
676     // Retrieve the index that we are deleting before it's gone, so that we can
677     // report it to the event stream.
678     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
679
680     executeDeleteObjectIndex(indexName);
681
682     if(indexToDelete.isPresent()) {
683       // Update the event stream with the current operation.
684       logEvent(ChampEvent.builder()
685                     .operation(ChampOperation.DELETE)
686                     .entity(indexToDelete.get())
687                     .build());
688     }
689   }
690
691
692   public void storeRelationshipIndex(ChampRelationshipIndex index) {
693
694     executeStoreRelationshipIndex(index);
695
696     // Update the event stream with the current operation.
697     logEvent(ChampEvent.builder()
698                   .operation(ChampOperation.STORE)
699                   .entity(index)
700                   .build());
701   }
702
703
704   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
705
706     // Retrieve the index that we are deleting before it's gone, so that we can
707     // report it to the event stream.
708     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
709
710     executeDeleteRelationshipIndex(indexName);
711
712     if(indexToDelete.isPresent()) {
713       // Update the event stream with the current operation.
714       logEvent(ChampEvent.builder()
715                     .operation(ChampOperation.DELETE)
716                     .entity(indexToDelete.get())
717                     .build());
718     }
719   }
720
721   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
722
723     if(!transaction.isPresent()) {
724       // Update the event stream with the current operation.
725       logEvent(event);
726     } else {
727
728       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
729       event.setDbTransactionId ( transaction.get ().id () );
730       transaction.get().logEvent(event);
731     }
732   }
733
734   /**
735    * Submits an event to be published to the event stream.
736    *
737    * @param anEvent - The event to be published.
738    */
739   public void logEvent(ChampEvent anEvent) {
740
741     if(eventQueue == null) {
742       return;
743     }
744
745     logger.info(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_INFO, 
746         "Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
747     if(logger.isDebugEnabled()) {
748       logger.debug("Event payload: " + anEvent.toString());
749     }
750
751     // Try to submit the event to be published to the event bus.
752     if(!eventQueue.offer(anEvent)) {
753       logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
754           "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
755     }
756   }
757
758
759   /**
760    * This class implements the worker threads for our thread pool which are responsible for
761    * pulling the next outgoing event from the internal buffer and forwarding them to the event
762    * bus client.
763    * <p>
764    * Each publish operation is performed synchronously, so that the thread will only move on
765    * to the next available event once it has actually published the current event to the bus.
766    */
767   private class EventPublisherWorker implements Runnable {
768
769     /** Partition key to use when publishing events to the event stream.  We WANT all events
770      *  to go to a single partition, so we are just using a hard-coded key for every event. */
771     private static final String EVENTS_PARTITION_KEY = "champEventKey";
772
773
774     @Override
775     public void run() {
776
777       while(true) {
778         ChampEvent event = null;
779         try {
780
781           // Get the next event to be published from the queue.
782           event = eventQueue.take();
783
784         } catch (InterruptedException e) {
785
786           // Restore the interrupted status.
787           Thread.currentThread().interrupt();
788         }
789
790         // Create new envelope containing an event header and ChampEvent
791         ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
792
793         // Try publishing the event to the event bus.  This call will block until
794         try {
795           producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
796
797         } catch (Exception e) {
798
799           logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
800               "Failed to publish event to event bus: " + e.getMessage());
801         }
802       }
803     }
804   }
805 }