Update license date and text
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampGraph;
38 import org.onap.aai.champcore.ChampTransaction;
39 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import org.onap.aai.event.api.EventPublisher;
59
60
61
62 /**
63  * This class provides the hooks to allow Champ operations to be logged to an event
64  * stream.
65  */
66 public abstract class AbstractLoggingChampGraph implements ChampGraph {
67
68   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
69   
70   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
71   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
72   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException; 
73   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
74   @Override
75   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
76   @Override
77   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
78   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
79   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
81   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
82
83   
84   /**
85     * Creates or updates a vertex in the graph data store.
86     * <p>
87     * If a transaction context is not provided, then a transaction will be automatically 
88     * created and committed for this operation only, otherwise, the supplied transaction
89     * will be used and it will be up to the caller to commit the transaction at its 
90     * discretion.
91     * 
92     * @param object      - The vertex to be created or updated.
93     * @param transaction - Optional transaction context to perform the operation in.
94     * 
95     * @return - The vertex, as created, marshaled as a {@link ChampObject}
96     * 
97     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled 
98     *                                         into the backend representation
99     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
100     *                                         by {@link ChampGraph#retrieveSchema}
101     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
102     *                                         is not present or object not found in the graph
103     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
104     */
105   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
106   
107   /**
108    * Updates an existing vertex in the graph store.
109    * <p>
110    * If a transaction context is not provided, then a transaction will be automatically 
111    * created and committed for this operation only, otherwise, the supplied transaction
112    * will be used and it will be up to the caller to commit the transaction at its 
113    * discretion.
114    * 
115    * @param object      - The vertex to be created or updated.
116    * @param transaction - Optional transaction context to perform the operation in.
117    * 
118    * @return - The updated vertex, marshaled as a {@link ChampObject}
119    * 
120    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into 
121    *                                         the backend representation
122    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed 
123    *                                         by {@link ChampGraph#retrieveSchema}
124    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
125    *                                         is not present or object not found in the graph
126    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
127    */
128   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
129
130   /**
131    * Deletes an existing vertex from the graph store.
132    * <p>
133    * If a transaction context is not provided, then a transaction will be automatically 
134    * created and committed for this operation only, otherwise, the supplied transaction
135    * will be used and it will be up to the caller to commit the transaction at its 
136    * discretion.
137    * 
138    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
139    * @param transaction - Optional transaction context to perform the operation in.
140    * 
141    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey} 
142    *                                         is not present or object not found in the graph
143    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
144    */
145   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
146     
147   /**
148    * Creates or updates an edge in the graph data store.
149    * <p>
150    * If a transaction context is not provided, then a transaction will be automatically 
151    * created and committed for this operation only, otherwise, the supplied transaction
152    * will be used and it will be up to the caller to commit the transaction at its 
153    * discretion.
154    * 
155    * @param relationship - The ChampRelationship that you wish to store in the graph
156    * @param transaction  - Optional transaction context to perform the operation in.
157    * 
158    * @return - The {@link ChampRelationship} as it was stored.
159    * 
160    * @throws ChampUnmarshallingException         - If the edge which was created could not be 
161    *                                               unmarshalled into a ChampRelationship
162    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
163    *                                               marshalled into the backend representation
164    * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
165    *                                               by this relationship does not exist in the graph
166    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
167    *                                               specifed by {@link ChampGraph#retrieveSchema}
168    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
169    *                                               but the object cannot be found in the graph
170    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
171    */
172   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;  
173
174   /**
175    * Replaces an existing edge in the graph data store.
176    * <p>
177    * If a transaction context is not provided, then a transaction will be automatically 
178    * created and committed for this operation only, otherwise, the supplied transaction
179    * will be used and it will be up to the caller to commit the transaction at its 
180    * discretion.
181    * 
182    * @param relationship  - The ChampRelationship that you wish to replace in the graph
183    * @param transaction   - Optional transaction context to perform the operation in.
184    * 
185    * @return - The {@link ChampRelationship} as it was stored.
186    * 
187    * @throws ChampUnmarshallingException         - If the edge which was created could not be 
188    *                                               unmarshalled into a ChampRelationship
189    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
190    *                                               marshalled into the backend representation
191    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
192    *                                               specifed by {@link ChampGraph#retrieveSchema}
193    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
194    *                                               but the object cannot be found in the graph
195    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
196    */
197   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException; 
198   
199   /**
200    * Removes an edge from the graph data store.
201    * <p>
202    * If a transaction context is not provided, then a transaction will be automatically 
203    * created and committed for this operation only, otherwise, the supplied transaction
204    * will be used and it will be up to the caller to commit the transaction at its 
205    * discretion.
206    * 
207    * @param relationship - The ChampRelationship that you wish to remove from the graph.
208    * @param transaction  - Optional transaction context to perform the operation in.
209    * 
210    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
211    *                                               but the object cannot be found in the graph
212    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
213    */
214   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
215   
216   /**
217    * Create or update a {@link ChampPartition}.
218    * <p>
219    * If a transaction context is not provided, then a transaction will be automatically 
220    * created and committed for this operation only, otherwise, the supplied transaction
221    * will be used and it will be up to the caller to commit the transaction at its 
222    * discretion.
223    *  
224    * @param partition   - The ChampPartition that you wish to create or update in the graph.
225    * @param transaction - Optional transaction context to perform the operation in.
226    * 
227    * @return - The {@link ChampPartition} as it was stored.
228    * 
229    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints 
230    *                                               specifed by {@link ChampGraph#retrieveSchema}
231    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent() 
232    *                                               but the object cannot be found in the graph
233    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be 
234    *                                               marshalled into the backend representation
235    * @throws ChampObjectNotExistsException       - If either the source or target object referenced 
236    *                                               by this relationship does not exist in the graph
237    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
238    */
239   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
240   
241   /**
242    * Removes a partition from the graph.
243    * <p>
244    * If a transaction context is not provided, then a transaction will be automatically 
245    * created and committed for this operation only, otherwise, the supplied transaction
246    * will be used and it will be up to the caller to commit the transaction at its 
247    * discretion.
248    * 
249    * @param graph       - The partition to be removed.
250    * @param transaction - Optional transaction context to perform the operation in.
251    * 
252    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
253    */
254   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
255   
256   /**
257    * Create or update an object index in the graph.
258    * 
259    * @param index       - The object index to be created/updated.
260    */
261   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
262   
263   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
264   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
265   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
266   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
267   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
268   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
269   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
270   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
271   public abstract ChampSchema                      retrieveSchema();
272   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
273   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
274   public abstract void                             deleteSchema();
275   public abstract ChampCapabilities                capabilities();
276
277    
278   
279   public final static String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
280   public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
281   
282   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
283   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
284   
285   public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher"; 
286   
287     
288   
289   /** Number of events that can be queued up for publication before we begin dropping
290    *  events. */
291   private Integer eventQueueCapacity;
292   
293   /** Number of event publisher worker threads. */
294   private Integer eventStreamPublisherPoolSize;
295   
296   /** Pool of worker threads that do the work of publishing the events to the event bus. */
297   protected ThreadPoolExecutor publisherPool;
298     
299   /** Client used for publishing events to the event bus. */
300   protected EventPublisher producer;
301   
302   /** Internal queue where outgoing events will be buffered until they can be serviced by 
303    *  the event publisher worker threads. */
304   protected BlockingQueue<ChampEvent> eventQueue;
305   
306
307   /**
308    * Thread factory for the event producer workers.
309    */
310   private class ProducerWorkerThreadFactory implements ThreadFactory {
311     
312     private AtomicInteger threadNumber = new AtomicInteger(1);
313     
314     public Thread newThread(Runnable r) {
315       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
316     }
317   }
318     
319   
320   /**
321    * Create a new instance of the AbstractLoggingChampGraph.
322    * 
323    * @param properties - Set of configuration properties for this graph instance.
324    */
325   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
326     
327     // Extract the necessary parameters from the configuration properties.
328     configure(properties);
329
330     // Make sure we were passed an event producer as one of our properties, otherwise
331     // there is really nothing more we can do...
332     if(producer == null) {
333       logger.error("No event stream producer was supplied.");
334       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
335       return;
336     }
337     
338     // Create the blocking queue that we will use to buffer events that we want
339     // published to the event bus.
340     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
341     
342     // Create the executor pool that will do the work of publishing events to the event bus.
343     publisherPool = 
344         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize, 
345                                                           new ProducerWorkerThreadFactory());
346     
347     try {
348       
349       // Start up the producer worker threads.
350       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
351          publisherPool.submit(new EventPublisherWorker()); 
352       }
353       
354     } catch (Exception e) {
355       
356       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
357       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
358       return;
359     }
360   }
361
362       
363   /**
364    * Process the configuration properties supplied for this graph instance.
365    * 
366    * @param properties - Configuration parameters.
367    */
368   private void configure(Map<String, Object> properties) {
369         
370     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
371     
372     eventQueueCapacity =
373         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
374     eventStreamPublisherPoolSize = 
375         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
376   }
377   
378   
379   public void setProducer(EventPublisher aProducer) {
380     
381     producer = aProducer;
382   }
383   
384   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
385     
386     if(properties.containsKey(property)) {
387       return properties.get(property);
388     } else {
389       return defaultValue;
390     }
391   }
392   
393   @Override
394   public void shutdown() {
395     
396     if(publisherPool != null) {
397       publisherPool.shutdown();
398       
399       try {
400         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
401       } catch (InterruptedException e) {}
402     }
403     
404     if(producer != null) {
405       
406       try {
407         producer.close();
408       
409       } catch (Exception e) {
410         logger.error("Failed to stop event stream producer: " + e.getMessage());
411       }
412     }
413   }
414   
415   @Override
416   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
417  
418     try {
419       
420       // Commit the transaction.
421       transaction.commit();
422       
423     } catch (ChampTransactionException e) {
424       
425       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
426
427       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
428       for(ChampEvent event : enqueuedEvents) {
429         
430         logger.debug("Graph event " + event.toString() + " not published.");
431       }
432       throw e;
433     }
434     
435     // Now that the transaction has been successfully committed, we need
436     // to log the events that were produced within that transaction's
437     // context.
438     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
439     for(ChampEvent event : enqueuedEvents) {
440       logEvent(event);
441     }
442   }
443   
444   @Override
445   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
446     
447     // Rollback the transaction. 
448     transaction.rollback();
449   }
450   
451   @Override
452   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
453     return storeObject(object, Optional.empty());
454   }
455   
456   @Override
457   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
458     
459     ChampObject storedObject = executeStoreObject(object, transaction);
460     
461     if(storedObject != null) {
462       
463       logOrEnqueueEvent(ChampEvent.builder()
464                                     .operation(ChampOperation.STORE)
465                                     .entity(storedObject)
466                                     .build(), 
467                         transaction);
468     }
469     
470     return storedObject;
471   }
472   
473   @Override
474   public ChampObject replaceObject(ChampObject object)
475       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
476
477     return replaceObject(object, Optional.empty());
478   }
479   
480   @Override
481   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
482       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
483     
484     ChampObject replacedObject = executeReplaceObject(object, transaction);
485     
486     if(replacedObject != null) {
487       
488       logOrEnqueueEvent(ChampEvent.builder()
489                                   .operation(ChampOperation.REPLACE)
490                                   .entity(replacedObject)
491                                   .build(), 
492                         transaction);
493     }
494     
495     return replacedObject;
496   }
497   
498   @Override
499   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
500     deleteObject(key, Optional.empty());
501   }
502   
503   @Override
504   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
505
506     // Retrieve the object that we are deleting before it's gone, so that we can 
507     // report it to the event stream.
508     Optional<ChampObject> objectToDelete = Optional.empty();
509     try {
510       objectToDelete = retrieveObject(key, transaction);
511       
512     } catch (ChampUnmarshallingException e) {
513       logger.error("Unable to generate delete object log: " + e.getMessage());
514     }
515     
516     executeDeleteObject(key, transaction);
517     
518     if(objectToDelete.isPresent()) {
519       // Update the event stream with the current operation.
520       logOrEnqueueEvent(ChampEvent.builder()
521                                   .operation(ChampOperation.DELETE)
522                                   .entity(objectToDelete.get())
523                                   .build(),
524                         transaction);
525     }
526   }
527  
528   @Override
529   public ChampRelationship storeRelationship(ChampRelationship relationship)
530       throws ChampUnmarshallingException, 
531              ChampMarshallingException, 
532              ChampObjectNotExistsException, 
533              ChampSchemaViolationException, 
534              ChampRelationshipNotExistsException, ChampTransactionException {  
535       return storeRelationship(relationship, Optional.empty());
536   }
537   
538   @Override
539   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
540       throws ChampUnmarshallingException, 
541              ChampMarshallingException, 
542              ChampObjectNotExistsException, 
543              ChampSchemaViolationException, 
544              ChampRelationshipNotExistsException, ChampTransactionException {  
545
546     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
547     
548     if(storedRelationship != null) {
549       
550       // Update the event stream with the current operation.
551       logOrEnqueueEvent(ChampEvent.builder()
552                                   .operation(ChampOperation.STORE)
553                                   .entity(storedRelationship)
554                                   .build(),
555                         transaction);
556     }
557     
558     return storedRelationship;
559   }
560
561   @Override
562   public ChampRelationship replaceRelationship(ChampRelationship relationship)
563       throws ChampUnmarshallingException, 
564              ChampMarshallingException, 
565              ChampSchemaViolationException, 
566              ChampRelationshipNotExistsException, ChampTransactionException { 
567     return replaceRelationship(relationship, Optional.empty());
568   }
569   
570   @Override
571   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
572       throws ChampUnmarshallingException, 
573              ChampMarshallingException, 
574              ChampSchemaViolationException, 
575              ChampRelationshipNotExistsException, ChampTransactionException { 
576
577     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
578     
579     if(replacedRelationship != null) {
580       
581       // Update the event stream with the current operation.
582       logOrEnqueueEvent(ChampEvent.builder()
583                                   .operation(ChampOperation.REPLACE)
584                                   .entity(replacedRelationship)
585                                   .build(),
586                         transaction);
587     }
588     
589     return replacedRelationship;
590   }
591   
592   @Override
593   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
594     deleteRelationship(relationship, Optional.empty());
595   }
596   
597   @Override
598   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
599
600     executeDeleteRelationship(relationship, transaction);
601     
602     // Update the event stream with the current operation.
603     logOrEnqueueEvent(ChampEvent.builder()
604                                 .operation(ChampOperation.DELETE)
605                                 .entity(relationship)
606                                 .build(),
607                       transaction);
608   }
609   
610   @Override
611   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
612     return storePartition(partition, Optional.empty());
613   }
614   
615   @Override
616   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
617
618     ChampPartition storedPartition = executeStorePartition(partition, transaction);
619     
620     if(storedPartition != null) {
621       
622       // Update the event stream with the current operation.
623       logOrEnqueueEvent(ChampEvent.builder()
624                                   .operation(ChampOperation.STORE)
625                                   .entity(storedPartition)
626                                   .build(),
627                         transaction);
628     }
629     
630     return storedPartition;
631   }
632   
633   @Override
634   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
635     deletePartition(graph, Optional.empty());
636   }
637   
638   @Override
639   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
640
641     executeDeletePartition(graph, transaction);
642     
643     // Update the event stream with the current operation.
644     logOrEnqueueEvent(ChampEvent.builder()
645                                 .operation(ChampOperation.DELETE)
646                                 .entity(graph)
647                                 .build(),
648                       transaction);
649   }
650   
651   @Override
652   public void storeObjectIndex(ChampObjectIndex index) {
653
654     executeStoreObjectIndex(index);
655     
656     // Update the event stream with the current operation.
657     logEvent(ChampEvent.builder()
658                   .operation(ChampOperation.STORE)
659                   .entity(index)
660                   .build());
661   }
662   
663   
664   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
665     
666     // Retrieve the index that we are deleting before it's gone, so that we can 
667     // report it to the event stream.
668     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
669     
670     executeDeleteObjectIndex(indexName);
671     
672     if(indexToDelete.isPresent()) {
673       // Update the event stream with the current operation.
674       logEvent(ChampEvent.builder()
675                     .operation(ChampOperation.DELETE)
676                     .entity(indexToDelete.get()) 
677                     .build());
678     }
679   }
680   
681   
682   public void storeRelationshipIndex(ChampRelationshipIndex index) {
683
684     executeStoreRelationshipIndex(index);
685     
686     // Update the event stream with the current operation.
687     logEvent(ChampEvent.builder()
688                   .operation(ChampOperation.STORE)
689                   .entity(index) 
690                   .build());
691   }
692   
693   
694   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
695
696     // Retrieve the index that we are deleting before it's gone, so that we can 
697     // report it to the event stream.
698     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
699     
700     executeDeleteRelationshipIndex(indexName);
701     
702     if(indexToDelete.isPresent()) {
703       // Update the event stream with the current operation.
704       logEvent(ChampEvent.builder()
705                     .operation(ChampOperation.DELETE)
706                     .entity(indexToDelete.get()) 
707                     .build());
708     }
709   }
710   
711   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
712     
713     if(!transaction.isPresent()) {
714       // Update the event stream with the current operation.
715       logEvent(event);
716     } else {
717
718       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
719       event.setDbTransactionId ( transaction.get ().id () );
720       transaction.get().logEvent(event);
721     }
722   }
723   
724   /**
725    * Submits an event to be published to the event stream.
726    * 
727    * @param anEvent - The event to be published.
728    */
729   public void logEvent(ChampEvent anEvent) {
730     
731     if(eventQueue == null) {
732       return;
733     }
734     
735     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
736     if(logger.isDebugEnabled()) {
737       logger.debug("Event payload: " + anEvent.toString());
738     }
739     
740     // Try to submit the event to be published to the event bus.
741     if(!eventQueue.offer(anEvent)) {
742       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
743     }
744   }
745   
746   
747   /**
748    * This class implements the worker threads for our thread pool which are responsible for 
749    * pulling the next outgoing event from the internal buffer and forwarding them to the event
750    * bus client. 
751    * <p>
752    * Each publish operation is performed synchronously, so that the thread will only move on
753    * to the next available event once it has actually published the current event to the bus.
754    */
755   private class EventPublisherWorker implements Runnable {
756
757     /** Partition key to use when publishing events to the event stream.  We WANT all events
758      *  to go to a single partition, so we are just using a hard-coded key for every event. */
759     private static final String EVENTS_PARTITION_KEY = "champEventKey";
760     
761     
762     @Override
763     public void run() {
764       
765       while(true) { 
766         
767         ChampEvent event = null;
768         try {
769           
770           // Get the next event to be published from the queue.
771           event = eventQueue.take();
772                
773         } catch (InterruptedException e) {
774          
775           // Restore the interrupted status.
776           Thread.currentThread().interrupt();
777         }
778         
779         // Try publishing the event to the event bus.  This call will block
780         // until 
781         try {
782           producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
783           
784         } catch (Exception e) {
785   
786           logger.error("Failed to publish event to event bus: " + e.getMessage());
787         }
788       }
789     }
790   }
791 }