Update payload format for update notification
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
index 07647d2..1f93a97 100644 (file)
@@ -32,11 +32,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
-
 import org.onap.aai.champcore.ChampCapabilities;
 import org.onap.aai.champcore.ChampGraph;
 import org.onap.aai.champcore.ChampTransaction;
 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
+import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
+import org.onap.aai.champcore.event.envelope.ChampEventHeader;
 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
@@ -52,11 +53,10 @@ import org.onap.aai.champcore.model.ChampRelationship;
 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
 import org.onap.aai.champcore.model.ChampRelationshipIndex;
 import org.onap.aai.champcore.model.ChampSchema;
+import org.onap.aai.event.api.EventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.onap.aai.event.api.EventPublisher;
-
 
 
 /**
@@ -66,10 +66,10 @@ import org.onap.aai.event.api.EventPublisher;
 public abstract class AbstractLoggingChampGraph implements ChampGraph {
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
-  
+
   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
-  public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; 
+  public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
   @Override
   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
@@ -80,48 +80,48 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
 
-  
+
   /**
     * Creates or updates a vertex in the graph data store.
     * <p>
-    * If a transaction context is not provided, then a transaction will be automatically 
+    * If a transaction context is not provided, then a transaction will be automatically
     * created and committed for this operation only, otherwise, the supplied transaction
-    * will be used and it will be up to the caller to commit the transaction at its 
+    * will be used and it will be up to the caller to commit the transaction at its
     * discretion.
-    * 
+    *
     * @param object      - The vertex to be created or updated.
     * @param transaction - Optional transaction context to perform the operation in.
-    * 
+    *
     * @return - The vertex, as created, marshaled as a {@link ChampObject}
-    * 
-    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled 
+    *
+    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
     *                                         into the backend representation
-    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
+    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
     *                                         by {@link ChampGraph#retrieveSchema}
-    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
+    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
     *                                         is not present or object not found in the graph
     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
     */
   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
-  
+
   /**
    * Updates an existing vertex in the graph store.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param object      - The vertex to be created or updated.
    * @param transaction - Optional transaction context to perform the operation in.
-   * 
+   *
    * @return - The updated vertex, marshaled as a {@link ChampObject}
-   * 
-   * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into 
+   *
+   * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
    *                                         the backend representation
-   * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
+   * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
    *                                         by {@link ChampGraph#retrieveSchema}
-   * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
+   * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
    *                                         is not present or object not found in the graph
    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
    */
@@ -130,136 +130,136 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   /**
    * Deletes an existing vertex from the graph store.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
    * @param transaction - Optional transaction context to perform the operation in.
-   * 
-   * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
+   *
+   * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
    *                                         is not present or object not found in the graph
    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
    */
   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
-    
+
   /**
    * Creates or updates an edge in the graph data store.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param relationship - The ChampRelationship that you wish to store in the graph
    * @param transaction  - Optional transaction context to perform the operation in.
-   * 
+   *
    * @return - The {@link ChampRelationship} as it was stored.
-   * 
-   * @throws ChampUnmarshallingException         - If the edge which was created could not be 
+   *
+   * @throws ChampUnmarshallingException         - If the edge which was created could not be
    *                                               unmarshalled into a ChampRelationship
-   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
+   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
    *                                               marshalled into the backend representation
-   * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
+   * @throws ChampObjectNotExistsException       - If either the source or target object referenced
    *                                               by this relationship does not exist in the graph
-   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
+   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
    *                                               specifed by {@link ChampGraph#retrieveSchema}
-   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
+   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
    *                                               but the object cannot be found in the graph
    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
    */
-  public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;  
+  public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
 
   /**
    * Replaces an existing edge in the graph data store.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param relationship  - The ChampRelationship that you wish to replace in the graph
    * @param transaction   - Optional transaction context to perform the operation in.
-   * 
+   *
    * @return - The {@link ChampRelationship} as it was stored.
-   * 
-   * @throws ChampUnmarshallingException         - If the edge which was created could not be 
+   *
+   * @throws ChampUnmarshallingException         - If the edge which was created could not be
    *                                               unmarshalled into a ChampRelationship
-   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
+   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
    *                                               marshalled into the backend representation
-   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
+   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
    *                                               specifed by {@link ChampGraph#retrieveSchema}
-   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
+   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
    *                                               but the object cannot be found in the graph
    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
    */
-  public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; 
-  
+  public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
+
   /**
    * Removes an edge from the graph data store.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param relationship - The ChampRelationship that you wish to remove from the graph.
    * @param transaction  - Optional transaction context to perform the operation in.
-   * 
-   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
+   *
+   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
    *                                               but the object cannot be found in the graph
    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
    */
   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
-  
+
   /**
    * Create or update a {@link ChampPartition}.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   *  
+   *
    * @param partition   - The ChampPartition that you wish to create or update in the graph.
    * @param transaction - Optional transaction context to perform the operation in.
-   * 
+   *
    * @return - The {@link ChampPartition} as it was stored.
-   * 
-   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
+   *
+   * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
    *                                               specifed by {@link ChampGraph#retrieveSchema}
-   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
+   * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
    *                                               but the object cannot be found in the graph
-   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
+   * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
    *                                               marshalled into the backend representation
-   * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
+   * @throws ChampObjectNotExistsException       - If either the source or target object referenced
    *                                               by this relationship does not exist in the graph
    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
    */
   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
-  
+
   /**
    * Removes a partition from the graph.
    * <p>
-   * If a transaction context is not provided, then a transaction will be automatically 
+   * If a transaction context is not provided, then a transaction will be automatically
    * created and committed for this operation only, otherwise, the supplied transaction
-   * will be used and it will be up to the caller to commit the transaction at its 
+   * will be used and it will be up to the caller to commit the transaction at its
    * discretion.
-   * 
+   *
    * @param graph       - The partition to be removed.
    * @param transaction - Optional transaction context to perform the operation in.
-   * 
+   *
    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
    */
   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
-  
+
   /**
    * Create or update an object index in the graph.
-   * 
+   *
    * @param index       - The object index to be created/updated.
    */
   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
-  
+
   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
@@ -274,56 +274,56 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   public abstract void                             deleteSchema();
   public abstract ChampCapabilities                capabilities();
 
-   
-  
+
+
   public final static String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
   public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
-  
+
   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
-  
-  public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; 
-  
-    
-  
+
+  public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+
+
+
   /** Number of events that can be queued up for publication before we begin dropping
    *  events. */
   private Integer eventQueueCapacity;
-  
+
   /** Number of event publisher worker threads. */
   private Integer eventStreamPublisherPoolSize;
-  
+
   /** Pool of worker threads that do the work of publishing the events to the event bus. */
   protected ThreadPoolExecutor publisherPool;
-    
+
   /** Client used for publishing events to the event bus. */
   protected EventPublisher producer;
-  
-  /** Internal queue where outgoing events will be buffered until they can be serviced by 
+
+  /** Internal queue where outgoing events will be buffered until they can be serviced by
    *  the event publisher worker threads. */
   protected BlockingQueue<ChampEvent> eventQueue;
-  
+
 
   /**
    * Thread factory for the event producer workers.
    */
   private class ProducerWorkerThreadFactory implements ThreadFactory {
-    
+
     private AtomicInteger threadNumber = new AtomicInteger(1);
-    
+
     public Thread newThread(Runnable r) {
       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
     }
   }
-    
-  
+
+
   /**
    * Create a new instance of the AbstractLoggingChampGraph.
-   * 
+   *
    * @param properties - Set of configuration properties for this graph instance.
    */
   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
-    
+
     // Extract the necessary parameters from the configuration properties.
     configure(properties);
 
@@ -334,104 +334,104 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
       return;
     }
-    
+
     // Create the blocking queue that we will use to buffer events that we want
     // published to the event bus.
     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
-    
+
     // Create the executor pool that will do the work of publishing events to the event bus.
-    publisherPool = 
-        (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, 
+    publisherPool =
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
                                                           new ProducerWorkerThreadFactory());
-    
+
     try {
-      
+
       // Start up the producer worker threads.
       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
-         publisherPool.submit(new EventPublisherWorker()); 
+         publisherPool.submit(new EventPublisherWorker());
       }
-      
+
     } catch (Exception e) {
-      
+
       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
       return;
     }
   }
 
-      
+
   /**
    * Process the configuration properties supplied for this graph instance.
-   * 
+   *
    * @param properties - Configuration parameters.
    */
   private void configure(Map<String, Object> properties) {
-        
+
     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
-    
+
     eventQueueCapacity =
         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
-    eventStreamPublisherPoolSize = 
+    eventStreamPublisherPoolSize =
         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
   }
-  
-  
+
+
   public void setProducer(EventPublisher aProducer) {
-    
+
     producer = aProducer;
   }
-  
+
   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
-    
+
     if(properties.containsKey(property)) {
       return properties.get(property);
     } else {
       return defaultValue;
     }
   }
-  
+
   @Override
   public void shutdown() {
-    
+
     if(publisherPool != null) {
       publisherPool.shutdown();
-      
+
       try {
         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {}
     }
-    
+
     if(producer != null) {
-      
+
       try {
         producer.close();
-      
+
       } catch (Exception e) {
         logger.error("Failed to stop event stream producer: " + e.getMessage());
       }
     }
   }
-  
+
   @Override
   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
+
     try {
-      
+
       // Commit the transaction.
       transaction.commit();
-      
+
     } catch (ChampTransactionException e) {
-      
+
       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
 
       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
       for(ChampEvent event : enqueuedEvents) {
-        
+
         logger.debug("Graph event " + event.toString() + " not published.");
       }
       throw e;
     }
-    
+
     // Now that the transaction has been successfully committed, we need
     // to log the events that were produced within that transaction's
     // context.
@@ -440,81 +440,81 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       logEvent(event);
     }
   }
-  
+
   @Override
   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
-    
-    // Rollback the transaction. 
+
+    // Rollback the transaction.
     transaction.rollback();
   }
-  
+
   @Override
   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
     return storeObject(object, Optional.empty());
   }
-  
+
   @Override
   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-    
+
     ChampObject storedObject = executeStoreObject(object, transaction);
-    
+
     if(storedObject != null) {
-      
+
       logOrEnqueueEvent(ChampEvent.builder()
                                     .operation(ChampOperation.STORE)
                                     .entity(storedObject)
-                                    .build(), 
+                                    .build(),
                         transaction);
     }
-    
+
     return storedObject;
   }
-  
+
   @Override
   public ChampObject replaceObject(ChampObject object)
       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
 
     return replaceObject(object, Optional.empty());
   }
-  
+
   @Override
   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
-    
+
     ChampObject replacedObject = executeReplaceObject(object, transaction);
-    
+
     if(replacedObject != null) {
-      
+
       logOrEnqueueEvent(ChampEvent.builder()
                                   .operation(ChampOperation.REPLACE)
                                   .entity(replacedObject)
-                                  .build(), 
+                                  .build(),
                         transaction);
     }
-    
+
     return replacedObject;
   }
-  
+
   @Override
   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
     deleteObject(key, Optional.empty());
   }
-  
+
   @Override
   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
 
-    // Retrieve the object that we are deleting before it's gone, so that we can 
+    // Retrieve the object that we are deleting before it's gone, so that we can
     // report it to the event stream.
     Optional<ChampObject> objectToDelete = Optional.empty();
     try {
       objectToDelete = retrieveObject(key, transaction);
-      
+
     } catch (ChampUnmarshallingException e) {
       logger.error("Unable to generate delete object log: " + e.getMessage());
     }
-    
+
     executeDeleteObject(key, transaction);
-    
+
     if(objectToDelete.isPresent()) {
       // Update the event stream with the current operation.
       logOrEnqueueEvent(ChampEvent.builder()
@@ -524,29 +524,29 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                         transaction);
     }
   }
+
   @Override
   public ChampRelationship storeRelationship(ChampRelationship relationship)
-      throws ChampUnmarshallingException, 
-             ChampMarshallingException, 
-             ChampObjectNotExistsException, 
-             ChampSchemaViolationException, 
-             ChampRelationshipNotExistsException, ChampTransactionException {  
+      throws ChampUnmarshallingException,
+             ChampMarshallingException,
+             ChampObjectNotExistsException,
+             ChampSchemaViolationException,
+             ChampRelationshipNotExistsException, ChampTransactionException {
       return storeRelationship(relationship, Optional.empty());
   }
-  
+
   @Override
   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
-      throws ChampUnmarshallingException, 
-             ChampMarshallingException, 
-             ChampObjectNotExistsException, 
-             ChampSchemaViolationException, 
-             ChampRelationshipNotExistsException, ChampTransactionException {  
+      throws ChampUnmarshallingException,
+             ChampMarshallingException,
+             ChampObjectNotExistsException,
+             ChampSchemaViolationException,
+             ChampRelationshipNotExistsException, ChampTransactionException {
 
     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
-    
+
     if(storedRelationship != null) {
-      
+
       // Update the event stream with the current operation.
       logOrEnqueueEvent(ChampEvent.builder()
                                   .operation(ChampOperation.STORE)
@@ -554,30 +554,30 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                                   .build(),
                         transaction);
     }
-    
+
     return storedRelationship;
   }
 
   @Override
   public ChampRelationship replaceRelationship(ChampRelationship relationship)
-      throws ChampUnmarshallingException, 
-             ChampMarshallingException, 
-             ChampSchemaViolationException, 
-             ChampRelationshipNotExistsException, ChampTransactionException { 
+      throws ChampUnmarshallingException,
+             ChampMarshallingException,
+             ChampSchemaViolationException,
+             ChampRelationshipNotExistsException, ChampTransactionException {
     return replaceRelationship(relationship, Optional.empty());
   }
-  
+
   @Override
   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
-      throws ChampUnmarshallingException, 
-             ChampMarshallingException, 
-             ChampSchemaViolationException, 
-             ChampRelationshipNotExistsException, ChampTransactionException { 
+      throws ChampUnmarshallingException,
+             ChampMarshallingException,
+             ChampSchemaViolationException,
+             ChampRelationshipNotExistsException, ChampTransactionException {
 
     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
-    
+
     if(replacedRelationship != null) {
-      
+
       // Update the event stream with the current operation.
       logOrEnqueueEvent(ChampEvent.builder()
                                   .operation(ChampOperation.REPLACE)
@@ -585,20 +585,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                                   .build(),
                         transaction);
     }
-    
+
     return replacedRelationship;
   }
-  
+
   @Override
   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
     deleteRelationship(relationship, Optional.empty());
   }
-  
+
   @Override
   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
 
     executeDeleteRelationship(relationship, transaction);
-    
+
     // Update the event stream with the current operation.
     logOrEnqueueEvent(ChampEvent.builder()
                                 .operation(ChampOperation.DELETE)
@@ -606,19 +606,19 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                                 .build(),
                       transaction);
   }
-  
+
   @Override
   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
     return storePartition(partition, Optional.empty());
   }
-  
+
   @Override
   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
 
     ChampPartition storedPartition = executeStorePartition(partition, transaction);
-    
+
     if(storedPartition != null) {
-      
+
       // Update the event stream with the current operation.
       logOrEnqueueEvent(ChampEvent.builder()
                                   .operation(ChampOperation.STORE)
@@ -626,20 +626,20 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                                   .build(),
                         transaction);
     }
-    
+
     return storedPartition;
   }
-  
+
   @Override
   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
     deletePartition(graph, Optional.empty());
   }
-  
+
   @Override
   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
 
     executeDeletePartition(graph, transaction);
-    
+
     // Update the event stream with the current operation.
     logOrEnqueueEvent(ChampEvent.builder()
                                 .operation(ChampOperation.DELETE)
@@ -647,69 +647,69 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
                                 .build(),
                       transaction);
   }
-  
+
   @Override
   public void storeObjectIndex(ChampObjectIndex index) {
 
     executeStoreObjectIndex(index);
-    
+
     // Update the event stream with the current operation.
     logEvent(ChampEvent.builder()
                   .operation(ChampOperation.STORE)
                   .entity(index)
                   .build());
   }
-  
-  
+
+
   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
-    
-    // Retrieve the index that we are deleting before it's gone, so that we can 
+
+    // Retrieve the index that we are deleting before it's gone, so that we can
     // report it to the event stream.
     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
-    
+
     executeDeleteObjectIndex(indexName);
-    
+
     if(indexToDelete.isPresent()) {
       // Update the event stream with the current operation.
       logEvent(ChampEvent.builder()
                     .operation(ChampOperation.DELETE)
-                    .entity(indexToDelete.get()) 
+                    .entity(indexToDelete.get())
                     .build());
     }
   }
-  
-  
+
+
   public void storeRelationshipIndex(ChampRelationshipIndex index) {
 
     executeStoreRelationshipIndex(index);
-    
+
     // Update the event stream with the current operation.
     logEvent(ChampEvent.builder()
                   .operation(ChampOperation.STORE)
-                  .entity(index) 
+                  .entity(index)
                   .build());
   }
-  
-  
+
+
   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
 
-    // Retrieve the index that we are deleting before it's gone, so that we can 
+    // Retrieve the index that we are deleting before it's gone, so that we can
     // report it to the event stream.
     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
-    
+
     executeDeleteRelationshipIndex(indexName);
-    
+
     if(indexToDelete.isPresent()) {
       // Update the event stream with the current operation.
       logEvent(ChampEvent.builder()
                     .operation(ChampOperation.DELETE)
-                    .entity(indexToDelete.get()) 
+                    .entity(indexToDelete.get())
                     .build());
     }
   }
-  
+
   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
-    
+
     if(!transaction.isPresent()) {
       // Update the event stream with the current operation.
       logEvent(event);
@@ -720,34 +720,34 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       transaction.get().logEvent(event);
     }
   }
-  
+
   /**
    * Submits an event to be published to the event stream.
-   * 
+   *
    * @param anEvent - The event to be published.
    */
   public void logEvent(ChampEvent anEvent) {
-    
+
     if(eventQueue == null) {
       return;
     }
-    
+
     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
     if(logger.isDebugEnabled()) {
       logger.debug("Event payload: " + anEvent.toString());
     }
-    
+
     // Try to submit the event to be published to the event bus.
     if(!eventQueue.offer(anEvent)) {
       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
     }
   }
-  
-  
+
+
   /**
-   * This class implements the worker threads for our thread pool which are responsible for 
+   * This class implements the worker threads for our thread pool which are responsible for
    * pulling the next outgoing event from the internal buffer and forwarding them to the event
-   * bus client. 
+   * bus client.
    * <p>
    * Each publish operation is performed synchronously, so that the thread will only move on
    * to the next available event once it has actually published the current event to the bus.
@@ -757,32 +757,33 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
     /** Partition key to use when publishing events to the event stream.  We WANT all events
      *  to go to a single partition, so we are just using a hard-coded key for every event. */
     private static final String EVENTS_PARTITION_KEY = "champEventKey";
-    
-    
+
+
     @Override
     public void run() {
-      
-      while(true) { 
-        
+
+      while(true) {
         ChampEvent event = null;
         try {
-          
+
           // Get the next event to be published from the queue.
           event = eventQueue.take();
-               
+
         } catch (InterruptedException e) {
-         
+
           // Restore the interrupted status.
           Thread.currentThread().interrupt();
         }
-        
-        // Try publishing the event to the event bus.  This call will block
-        // until 
+
+        // Create new envelope containing an event header and ChampEvent
+        ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
+
+        // Try publishing the event to the event bus.  This call will block until
         try {
-          producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
-          
+          producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
+
         } catch (Exception e) {
-  
+
           logger.error("Failed to publish event to event bus: " + e.getMessage());
         }
       }