79a7f3c3b28881b7932f826610604baa5be8a0ac
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champcore.event;
23
24
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Optional;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.stream.Stream;
36
37 import org.onap.aai.champcore.ChampCapabilities;
38 import org.onap.aai.champcore.ChampGraph;
39 import org.onap.aai.champcore.ChampTransaction;
40 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
41 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
43 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
46 import org.onap.aai.champcore.exceptions.ChampTransactionException;
47 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPartition;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 import org.onap.aai.event.api.EventPublisher;
60
61
62
63 /**
64  * This class provides the hooks to allow Champ operations to be logged to an event
65  * stream.
66  */
67 public abstract class AbstractLoggingChampGraph implements ChampGraph {
68
69   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
70   
71   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
72   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
73   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; 
74   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
75   @Override
76   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
77   @Override
78   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
79   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
81   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
82   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
83
84   
85   /**
86     * Creates or updates a vertex in the graph data store.
87     * <p>
88     * If a transaction context is not provided, then a transaction will be automatically 
89     * created and committed for this operation only, otherwise, the supplied transaction
90     * will be used and it will be up to the caller to commit the transaction at its 
91     * discretion.
92     * 
93     * @param object      - The vertex to be created or updated.
94     * @param transaction - Optional transaction context to perform the operation in.
95     * 
96     * @return - The vertex, as created, marshaled as a {@link ChampObject}
97     * 
98     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled 
99     *                                         into the backend representation
100     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
101     *                                         by {@link ChampGraph#retrieveSchema}
102     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
103     *                                         is not present or object not found in the graph
104     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
105     */
106   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
107   
108   /**
109    * Updates an existing vertex in the graph store.
110    * <p>
111    * If a transaction context is not provided, then a transaction will be automatically 
112    * created and committed for this operation only, otherwise, the supplied transaction
113    * will be used and it will be up to the caller to commit the transaction at its 
114    * discretion.
115    * 
116    * @param object      - The vertex to be created or updated.
117    * @param transaction - Optional transaction context to perform the operation in.
118    * 
119    * @return - The updated vertex, marshaled as a {@link ChampObject}
120    * 
121    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into 
122    *                                         the backend representation
123    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
124    *                                         by {@link ChampGraph#retrieveSchema}
125    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
126    *                                         is not present or object not found in the graph
127    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
128    */
129   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
130
131   /**
132    * Deletes an existing vertex from the graph store.
133    * <p>
134    * If a transaction context is not provided, then a transaction will be automatically 
135    * created and committed for this operation only, otherwise, the supplied transaction
136    * will be used and it will be up to the caller to commit the transaction at its 
137    * discretion.
138    * 
139    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
140    * @param transaction - Optional transaction context to perform the operation in.
141    * 
142    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
143    *                                         is not present or object not found in the graph
144    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
145    */
146   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
147     
148   /**
149    * Creates or updates an edge in the graph data store.
150    * <p>
151    * If a transaction context is not provided, then a transaction will be automatically 
152    * created and committed for this operation only, otherwise, the supplied transaction
153    * will be used and it will be up to the caller to commit the transaction at its 
154    * discretion.
155    * 
156    * @param relationship - The ChampRelationship that you wish to store in the graph
157    * @param transaction  - Optional transaction context to perform the operation in.
158    * 
159    * @return - The {@link ChampRelationship} as it was stored.
160    * 
161    * @throws ChampUnmarshallingException         - If the edge which was created could not be 
162    *                                               unmarshalled into a ChampRelationship
163    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
164    *                                               marshalled into the backend representation
165    * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
166    *                                               by this relationship does not exist in the graph
167    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
168    *                                               specifed by {@link ChampGraph#retrieveSchema}
169    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
170    *                                               but the object cannot be found in the graph
171    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
172    */
173   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;  
174
175   /**
176    * Replaces an existing edge in the graph data store.
177    * <p>
178    * If a transaction context is not provided, then a transaction will be automatically 
179    * created and committed for this operation only, otherwise, the supplied transaction
180    * will be used and it will be up to the caller to commit the transaction at its 
181    * discretion.
182    * 
183    * @param relationship  - The ChampRelationship that you wish to replace in the graph
184    * @param transaction   - Optional transaction context to perform the operation in.
185    * 
186    * @return - The {@link ChampRelationship} as it was stored.
187    * 
188    * @throws ChampUnmarshallingException         - If the edge which was created could not be 
189    *                                               unmarshalled into a ChampRelationship
190    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
191    *                                               marshalled into the backend representation
192    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
193    *                                               specifed by {@link ChampGraph#retrieveSchema}
194    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
195    *                                               but the object cannot be found in the graph
196    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
197    */
198   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; 
199   
200   /**
201    * Removes an edge from the graph data store.
202    * <p>
203    * If a transaction context is not provided, then a transaction will be automatically 
204    * created and committed for this operation only, otherwise, the supplied transaction
205    * will be used and it will be up to the caller to commit the transaction at its 
206    * discretion.
207    * 
208    * @param relationship - The ChampRelationship that you wish to remove from the graph.
209    * @param transaction  - Optional transaction context to perform the operation in.
210    * 
211    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
212    *                                               but the object cannot be found in the graph
213    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
214    */
215   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
216   
217   /**
218    * Create or update a {@link ChampPartition}.
219    * <p>
220    * If a transaction context is not provided, then a transaction will be automatically 
221    * created and committed for this operation only, otherwise, the supplied transaction
222    * will be used and it will be up to the caller to commit the transaction at its 
223    * discretion.
224    *  
225    * @param partition   - The ChampPartition that you wish to create or update in the graph.
226    * @param transaction - Optional transaction context to perform the operation in.
227    * 
228    * @return - The {@link ChampPartition} as it was stored.
229    * 
230    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
231    *                                               specifed by {@link ChampGraph#retrieveSchema}
232    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
233    *                                               but the object cannot be found in the graph
234    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
235    *                                               marshalled into the backend representation
236    * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
237    *                                               by this relationship does not exist in the graph
238    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
239    */
240   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
241   
242   /**
243    * Removes a partition from the graph.
244    * <p>
245    * If a transaction context is not provided, then a transaction will be automatically 
246    * created and committed for this operation only, otherwise, the supplied transaction
247    * will be used and it will be up to the caller to commit the transaction at its 
248    * discretion.
249    * 
250    * @param graph       - The partition to be removed.
251    * @param transaction - Optional transaction context to perform the operation in.
252    * 
253    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
254    */
255   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
256   
257   /**
258    * Create or update an object index in the graph.
259    * 
260    * @param index       - The object index to be created/updated.
261    */
262   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
263   
264   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
265   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
266   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
267   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
268   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
269   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
270   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
271   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
272   public abstract ChampSchema                      retrieveSchema();
273   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
274   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
275   public abstract void                             deleteSchema();
276   public abstract ChampCapabilities                capabilities();
277
278    
279   
280   public final static String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
281   public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
282   
283   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
284   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
285   
286   public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; 
287   
288     
289   
290   /** Number of events that can be queued up for publication before we begin dropping
291    *  events. */
292   private Integer eventQueueCapacity;
293   
294   /** Number of event publisher worker threads. */
295   private Integer eventStreamPublisherPoolSize;
296   
297   /** Pool of worker threads that do the work of publishing the events to the event bus. */
298   protected ThreadPoolExecutor publisherPool;
299     
300   /** Client used for publishing events to the event bus. */
301   protected EventPublisher producer;
302   
303   /** Internal queue where outgoing events will be buffered until they can be serviced by 
304    *  the event publisher worker threads. */
305   protected BlockingQueue<ChampEvent> eventQueue;
306   
307
308   /**
309    * Thread factory for the event producer workers.
310    */
311   private class ProducerWorkerThreadFactory implements ThreadFactory {
312     
313     private AtomicInteger threadNumber = new AtomicInteger(1);
314     
315     public Thread newThread(Runnable r) {
316       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
317     }
318   }
319     
320   
321   /**
322    * Create a new instance of the AbstractLoggingChampGraph.
323    * 
324    * @param properties - Set of configuration properties for this graph instance.
325    */
326   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
327     
328     // Extract the necessary parameters from the configuration properties.
329     configure(properties);
330
331     // Make sure we were passed an event producer as one of our properties, otherwise
332     // there is really nothing more we can do...
333     if(producer == null) {
334       logger.error("No event stream producer was supplied.");
335       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
336       return;
337     }
338     
339     // Create the blocking queue that we will use to buffer events that we want
340     // published to the event bus.
341     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
342     
343     // Create the executor pool that will do the work of publishing events to the event bus.
344     publisherPool = 
345         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, 
346                                                           new ProducerWorkerThreadFactory());
347     
348     try {
349       
350       // Start up the producer worker threads.
351       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
352          publisherPool.submit(new EventPublisherWorker()); 
353       }
354       
355     } catch (Exception e) {
356       
357       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
358       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
359       return;
360     }
361   }
362
363       
364   /**
365    * Process the configuration properties supplied for this graph instance.
366    * 
367    * @param properties - Configuration parameters.
368    */
369   private void configure(Map<String, Object> properties) {
370         
371     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
372     
373     eventQueueCapacity =
374         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
375     eventStreamPublisherPoolSize = 
376         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
377   }
378   
379   
380   public void setProducer(EventPublisher aProducer) {
381     
382     producer = aProducer;
383   }
384   
385   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
386     
387     if(properties.containsKey(property)) {
388       return properties.get(property);
389     } else {
390       return defaultValue;
391     }
392   }
393   
394   @Override
395   public void shutdown() {
396     
397     if(publisherPool != null) {
398       publisherPool.shutdown();
399       
400       try {
401         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
402       } catch (InterruptedException e) {}
403     }
404     
405     if(producer != null) {
406       
407       try {
408         producer.close();
409       
410       } catch (Exception e) {
411         logger.error("Failed to stop event stream producer: " + e.getMessage());
412       }
413     }
414   }
415   
416   @Override
417   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
418  
419     try {
420       
421       // Commit the transaction.
422       transaction.commit();
423       
424     } catch (ChampTransactionException e) {
425       
426       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
427
428       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
429       for(ChampEvent event : enqueuedEvents) {
430         
431         logger.debug("Graph event " + event.toString() + " not published.");
432       }
433       throw e;
434     }
435     
436     // Now that the transaction has been successfully committed, we need
437     // to log the events that were produced within that transaction's
438     // context.
439     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
440     for(ChampEvent event : enqueuedEvents) {
441       logEvent(event);
442     }
443   }
444   
445   @Override
446   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
447     
448     // Rollback the transaction. 
449     transaction.rollback();
450   }
451   
452   @Override
453   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
454     return storeObject(object, Optional.empty());
455   }
456   
457   @Override
458   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
459     
460     ChampObject storedObject = executeStoreObject(object, transaction);
461     
462     if(storedObject != null) {
463       
464       logOrEnqueueEvent(ChampEvent.builder()
465                                     .operation(ChampOperation.STORE)
466                                     .entity(storedObject)
467                                     .build(), 
468                         transaction);
469     }
470     
471     return storedObject;
472   }
473   
474   @Override
475   public ChampObject replaceObject(ChampObject object)
476       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
477
478     return replaceObject(object, Optional.empty());
479   }
480   
481   @Override
482   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
483       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
484     
485     ChampObject replacedObject = executeReplaceObject(object, transaction);
486     
487     if(replacedObject != null) {
488       
489       logOrEnqueueEvent(ChampEvent.builder()
490                                   .operation(ChampOperation.REPLACE)
491                                   .entity(replacedObject)
492                                   .build(), 
493                         transaction);
494     }
495     
496     return replacedObject;
497   }
498   
499   @Override
500   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
501     deleteObject(key, Optional.empty());
502   }
503   
504   @Override
505   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
506
507     // Retrieve the object that we are deleting before it's gone, so that we can 
508     // report it to the event stream.
509     Optional<ChampObject> objectToDelete = Optional.empty();
510     try {
511       objectToDelete = retrieveObject(key, transaction);
512       
513     } catch (ChampUnmarshallingException e) {
514       logger.error("Unable to generate delete object log: " + e.getMessage());
515     }
516     
517     executeDeleteObject(key, transaction);
518     
519     if(objectToDelete.isPresent()) {
520       // Update the event stream with the current operation.
521       logOrEnqueueEvent(ChampEvent.builder()
522                                   .operation(ChampOperation.DELETE)
523                                   .entity(objectToDelete.get())
524                                   .build(),
525                         transaction);
526     }
527   }
528  
529   @Override
530   public ChampRelationship storeRelationship(ChampRelationship relationship)
531       throws ChampUnmarshallingException, 
532              ChampMarshallingException, 
533              ChampObjectNotExistsException, 
534              ChampSchemaViolationException, 
535              ChampRelationshipNotExistsException, ChampTransactionException {  
536       return storeRelationship(relationship, Optional.empty());
537   }
538   
539   @Override
540   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
541       throws ChampUnmarshallingException, 
542              ChampMarshallingException, 
543              ChampObjectNotExistsException, 
544              ChampSchemaViolationException, 
545              ChampRelationshipNotExistsException, ChampTransactionException {  
546
547     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
548     
549     if(storedRelationship != null) {
550       
551       // Update the event stream with the current operation.
552       logOrEnqueueEvent(ChampEvent.builder()
553                                   .operation(ChampOperation.STORE)
554                                   .entity(storedRelationship)
555                                   .build(),
556                         transaction);
557     }
558     
559     return storedRelationship;
560   }
561
562   @Override
563   public ChampRelationship replaceRelationship(ChampRelationship relationship)
564       throws ChampUnmarshallingException, 
565              ChampMarshallingException, 
566              ChampSchemaViolationException, 
567              ChampRelationshipNotExistsException, ChampTransactionException { 
568     return replaceRelationship(relationship, Optional.empty());
569   }
570   
571   @Override
572   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
573       throws ChampUnmarshallingException, 
574              ChampMarshallingException, 
575              ChampSchemaViolationException, 
576              ChampRelationshipNotExistsException, ChampTransactionException { 
577
578     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
579     
580     if(replacedRelationship != null) {
581       
582       // Update the event stream with the current operation.
583       logOrEnqueueEvent(ChampEvent.builder()
584                                   .operation(ChampOperation.REPLACE)
585                                   .entity(replacedRelationship)
586                                   .build(),
587                         transaction);
588     }
589     
590     return replacedRelationship;
591   }
592   
593   @Override
594   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
595     deleteRelationship(relationship, Optional.empty());
596   }
597   
598   @Override
599   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
600
601     executeDeleteRelationship(relationship, transaction);
602     
603     // Update the event stream with the current operation.
604     logOrEnqueueEvent(ChampEvent.builder()
605                                 .operation(ChampOperation.DELETE)
606                                 .entity(relationship)
607                                 .build(),
608                       transaction);
609   }
610   
611   @Override
612   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
613     return storePartition(partition, Optional.empty());
614   }
615   
616   @Override
617   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
618
619     ChampPartition storedPartition = executeStorePartition(partition, transaction);
620     
621     if(storedPartition != null) {
622       
623       // Update the event stream with the current operation.
624       logOrEnqueueEvent(ChampEvent.builder()
625                                   .operation(ChampOperation.STORE)
626                                   .entity(storedPartition)
627                                   .build(),
628                         transaction);
629     }
630     
631     return storedPartition;
632   }
633   
634   @Override
635   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
636     deletePartition(graph, Optional.empty());
637   }
638   
639   @Override
640   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
641
642     executeDeletePartition(graph, transaction);
643     
644     // Update the event stream with the current operation.
645     logOrEnqueueEvent(ChampEvent.builder()
646                                 .operation(ChampOperation.DELETE)
647                                 .entity(graph)
648                                 .build(),
649                       transaction);
650   }
651   
652   @Override
653   public void storeObjectIndex(ChampObjectIndex index) {
654
655     executeStoreObjectIndex(index);
656     
657     // Update the event stream with the current operation.
658     logEvent(ChampEvent.builder()
659                   .operation(ChampOperation.STORE)
660                   .entity(index)
661                   .build());
662   }
663   
664   
665   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
666     
667     // Retrieve the index that we are deleting before it's gone, so that we can 
668     // report it to the event stream.
669     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
670     
671     executeDeleteObjectIndex(indexName);
672     
673     if(indexToDelete.isPresent()) {
674       // Update the event stream with the current operation.
675       logEvent(ChampEvent.builder()
676                     .operation(ChampOperation.DELETE)
677                     .entity(indexToDelete.get()) 
678                     .build());
679     }
680   }
681   
682   
683   public void storeRelationshipIndex(ChampRelationshipIndex index) {
684
685     executeStoreRelationshipIndex(index);
686     
687     // Update the event stream with the current operation.
688     logEvent(ChampEvent.builder()
689                   .operation(ChampOperation.STORE)
690                   .entity(index) 
691                   .build());
692   }
693   
694   
695   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
696
697     // Retrieve the index that we are deleting before it's gone, so that we can 
698     // report it to the event stream.
699     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
700     
701     executeDeleteRelationshipIndex(indexName);
702     
703     if(indexToDelete.isPresent()) {
704       // Update the event stream with the current operation.
705       logEvent(ChampEvent.builder()
706                     .operation(ChampOperation.DELETE)
707                     .entity(indexToDelete.get()) 
708                     .build());
709     }
710   }
711   
712   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
713     
714     if(!transaction.isPresent()) {
715       // Update the event stream with the current operation.
716       logEvent(event);
717     } else {
718
719       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
720       event.setDbTransactionId ( transaction.get ().id () );
721       transaction.get().logEvent(event);
722     }
723   }
724   
725   /**
726    * Submits an event to be published to the event stream.
727    * 
728    * @param anEvent - The event to be published.
729    */
730   public void logEvent(ChampEvent anEvent) {
731     
732     if(eventQueue == null) {
733       return;
734     }
735     
736     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
737     if(logger.isDebugEnabled()) {
738       logger.debug("Event payload: " + anEvent.toString());
739     }
740     
741     // Try to submit the event to be published to the event bus.
742     if(!eventQueue.offer(anEvent)) {
743       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
744     }
745   }
746   
747   
748   /**
749    * This class implements the worker threads for our thread pool which are responsible for 
750    * pulling the next outgoing event from the internal buffer and forwarding them to the event
751    * bus client. 
752    * <p>
753    * Each publish operation is performed synchronously, so that the thread will only move on
754    * to the next available event once it has actually published the current event to the bus.
755    */
756   private class EventPublisherWorker implements Runnable {
757
758     /** Partition key to use when publishing events to the event stream.  We WANT all events
759      *  to go to a single partition, so we are just using a hard-coded key for every event. */
760     private static final String EVENTS_PARTITION_KEY = "champEventKey";
761     
762     
763     @Override
764     public void run() {
765       
766       while(true) { 
767         
768         ChampEvent event = null;
769         try {
770           
771           // Get the next event to be published from the queue.
772           event = eventQueue.take();
773                
774         } catch (InterruptedException e) {
775          
776           // Restore the interrupted status.
777           Thread.currentThread().interrupt();
778         }
779         
780         // Try publishing the event to the event bus.  This call will block
781         // until 
782         try {
783           producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
784           
785         } catch (Exception e) {
786   
787           logger.error("Failed to publish event to event bus: " + e.getMessage());
788         }
789       }
790     }
791   }
792 }