change order in class for java convention
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60
61 /**
62  * This class provides the hooks to allow Champ operations to be logged to an event
63  * stream.
64  */
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
66
67   public static final String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68   public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71   public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
72
73   /** Pool of worker threads that do the work of publishing the events to the event bus. */
74   protected ThreadPoolExecutor publisherPool;
75
76   /** Client used for publishing events to the event bus. */
77   protected EventPublisher producer;
78
79   /** Internal queue where outgoing events will be buffered until they can be serviced by
80    *  the event publisher worker threads. */
81   protected BlockingQueue<ChampEvent> eventQueue;
82
83   /** Number of events that can be queued up for publication before we begin dropping
84    *  events. */
85   private Integer eventQueueCapacity;
86
87   /** Number of event publisher worker threads. */
88   private Integer eventStreamPublisherPoolSize;
89
90   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
91
92
93   /**
94    * Create a new instance of the AbstractLoggingChampGraph.
95    *
96    * @param properties - Set of configuration properties for this graph instance.
97    */
98   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
99
100     // Extract the necessary parameters from the configuration properties.
101     configure(properties);
102
103     // Make sure we were passed an event producer as one of our properties, otherwise
104     // there is really nothing more we can do...
105     if(producer == null) {
106       logger.error("No event stream producer was supplied.");
107       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
108       return;
109     }
110
111     // Create the blocking queue that we will use to buffer events that we want
112     // published to the event bus.
113     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
114
115     // Create the executor pool that will do the work of publishing events to the event bus.
116     publisherPool =
117         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
118             new ProducerWorkerThreadFactory());
119
120     try {
121
122       // Start up the producer worker threads.
123       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
124         publisherPool.submit(new EventPublisherWorker());
125       }
126
127     } catch (Exception e) {
128
129       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
130       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
131       return;
132     }
133   }
134
135
136
137   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
138   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
139   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
140   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
141   @Override
142   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
143   @Override
144   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
145   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
146   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
147   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
148
149   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
150
151   /**
152     * Creates or updates a vertex in the graph data store.
153     * <p>
154     * If a transaction context is not provided, then a transaction will be automatically
155     * created and committed for this operation only, otherwise, the supplied transaction
156     * will be used and it will be up to the caller to commit the transaction at its
157     * discretion.
158     *
159     * @param object      - The vertex to be created or updated.
160     * @param transaction - Optional transaction context to perform the operation in.
161     *
162     * @return - The vertex, as created, marshaled as a {@link ChampObject}
163     *
164     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
165     *                                         into the backend representation
166     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
167     *                                         by {@link ChampGraph#retrieveSchema}
168     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
169     *                                         is not present or object not found in the graph
170     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
171     */
172   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
173
174   /**
175    * Updates an existing vertex in the graph store.
176    * <p>
177    * If a transaction context is not provided, then a transaction will be automatically
178    * created and committed for this operation only, otherwise, the supplied transaction
179    * will be used and it will be up to the caller to commit the transaction at its
180    * discretion.
181    *
182    * @param object      - The vertex to be created or updated.
183    * @param transaction - Optional transaction context to perform the operation in.
184    *
185    * @return - The updated vertex, marshaled as a {@link ChampObject}
186    *
187    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
188    *                                         the backend representation
189    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
190    *                                         by {@link ChampGraph#retrieveSchema}
191    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
192    *                                         is not present or object not found in the graph
193    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
194    */
195   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
196
197   /**
198    * Deletes an existing vertex from the graph store.
199    * <p>
200    * If a transaction context is not provided, then a transaction will be automatically
201    * created and committed for this operation only, otherwise, the supplied transaction
202    * will be used and it will be up to the caller to commit the transaction at its
203    * discretion.
204    *
205    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
206    * @param transaction - Optional transaction context to perform the operation in.
207    *
208    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
209    *                                         is not present or object not found in the graph
210    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
211    */
212   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
213
214   /**
215    * Creates or updates an edge in the graph data store.
216    * <p>
217    * If a transaction context is not provided, then a transaction will be automatically
218    * created and committed for this operation only, otherwise, the supplied transaction
219    * will be used and it will be up to the caller to commit the transaction at its
220    * discretion.
221    *
222    * @param relationship - The ChampRelationship that you wish to store in the graph
223    * @param transaction  - Optional transaction context to perform the operation in.
224    *
225    * @return - The {@link ChampRelationship} as it was stored.
226    *
227    * @throws ChampUnmarshallingException         - If the edge which was created could not be
228    *                                               unmarshalled into a ChampRelationship
229    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
230    *                                               marshalled into the backend representation
231    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
232    *                                               by this relationship does not exist in the graph
233    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
234    *                                               specifed by {@link ChampGraph#retrieveSchema}
235    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
236    *                                               but the object cannot be found in the graph
237    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
238    */
239   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
240
241   /**
242    * Replaces an existing edge in the graph data store.
243    * <p>
244    * If a transaction context is not provided, then a transaction will be automatically
245    * created and committed for this operation only, otherwise, the supplied transaction
246    * will be used and it will be up to the caller to commit the transaction at its
247    * discretion.
248    *
249    * @param relationship  - The ChampRelationship that you wish to replace in the graph
250    * @param transaction   - Optional transaction context to perform the operation in.
251    *
252    * @return - The {@link ChampRelationship} as it was stored.
253    *
254    * @throws ChampUnmarshallingException         - If the edge which was created could not be
255    *                                               unmarshalled into a ChampRelationship
256    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
257    *                                               marshalled into the backend representation
258    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
259    *                                               specifed by {@link ChampGraph#retrieveSchema}
260    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
261    *                                               but the object cannot be found in the graph
262    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
263    */
264   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
265
266   /**
267    * Removes an edge from the graph data store.
268    * <p>
269    * If a transaction context is not provided, then a transaction will be automatically
270    * created and committed for this operation only, otherwise, the supplied transaction
271    * will be used and it will be up to the caller to commit the transaction at its
272    * discretion.
273    *
274    * @param relationship - The ChampRelationship that you wish to remove from the graph.
275    * @param transaction  - Optional transaction context to perform the operation in.
276    *
277    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
278    *                                               but the object cannot be found in the graph
279    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
280    */
281   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
282
283   /**
284    * Create or update a {@link ChampPartition}.
285    * <p>
286    * If a transaction context is not provided, then a transaction will be automatically
287    * created and committed for this operation only, otherwise, the supplied transaction
288    * will be used and it will be up to the caller to commit the transaction at its
289    * discretion.
290    *
291    * @param partition   - The ChampPartition that you wish to create or update in the graph.
292    * @param transaction - Optional transaction context to perform the operation in.
293    *
294    * @return - The {@link ChampPartition} as it was stored.
295    *
296    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
297    *                                               specifed by {@link ChampGraph#retrieveSchema}
298    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
299    *                                               but the object cannot be found in the graph
300    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
301    *                                               marshalled into the backend representation
302    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
303    *                                               by this relationship does not exist in the graph
304    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
305    */
306   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
307
308   /**
309    * Removes a partition from the graph.
310    * <p>
311    * If a transaction context is not provided, then a transaction will be automatically
312    * created and committed for this operation only, otherwise, the supplied transaction
313    * will be used and it will be up to the caller to commit the transaction at its
314    * discretion.
315    *
316    * @param graph       - The partition to be removed.
317    * @param transaction - Optional transaction context to perform the operation in.
318    *
319    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
320    */
321   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
322
323   /**
324    * Create or update an object index in the graph.
325    *
326    * @param index       - The object index to be created/updated.
327    */
328   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
329   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
330   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
331   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
332   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
333   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
334   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
335   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
336   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
337   public abstract ChampSchema                      retrieveSchema();
338   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
339   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
340   public abstract void                             deleteSchema();
341   public abstract ChampCapabilities                capabilities();
342
343
344   /**
345    * Thread factory for the event producer workers.
346    */
347   private class ProducerWorkerThreadFactory implements ThreadFactory {
348
349     private AtomicInteger threadNumber = new AtomicInteger(1);
350
351     public Thread newThread(Runnable r) {
352       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
353     }
354   }
355
356
357   /**
358    * Process the configuration properties supplied for this graph instance.
359    *
360    * @param properties - Configuration parameters.
361    */
362   private void configure(Map<String, Object> properties) {
363
364     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
365
366     eventQueueCapacity =
367         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
368     eventStreamPublisherPoolSize =
369         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
370   }
371
372
373   public void setProducer(EventPublisher aProducer) {
374
375     producer = aProducer;
376   }
377
378   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
379
380     if(properties.containsKey(property)) {
381       return properties.get(property);
382     } else {
383       return defaultValue;
384     }
385   }
386
387   @Override
388   public void shutdown() {
389
390     if(publisherPool != null) {
391       publisherPool.shutdown();
392
393       try {
394         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
395       } catch (InterruptedException e) {}
396     }
397
398     if(producer != null) {
399
400       try {
401         producer.close();
402
403       } catch (Exception e) {
404         logger.error("Failed to stop event stream producer: " + e.getMessage());
405       }
406     }
407   }
408
409   @Override
410   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
411
412     try {
413
414       // Commit the transaction.
415       transaction.commit();
416
417     } catch (ChampTransactionException e) {
418
419       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
420
421       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
422       for(ChampEvent event : enqueuedEvents) {
423
424         logger.debug("Graph event " + event.toString() + " not published.");
425       }
426       throw e;
427     }
428
429     // Now that the transaction has been successfully committed, we need
430     // to log the events that were produced within that transaction's
431     // context.
432     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
433     for(ChampEvent event : enqueuedEvents) {
434       logEvent(event);
435     }
436   }
437
438   @Override
439   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
440
441     // Rollback the transaction.
442     transaction.rollback();
443   }
444
445   @Override
446   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
447     return storeObject(object, Optional.empty());
448   }
449
450   @Override
451   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
452
453     ChampObject storedObject = executeStoreObject(object, transaction);
454
455     if(storedObject != null) {
456
457       logOrEnqueueEvent(ChampEvent.builder()
458                                     .operation(ChampOperation.STORE)
459                                     .entity(storedObject)
460                                     .build(),
461                         transaction);
462     }
463
464     return storedObject;
465   }
466
467   @Override
468   public ChampObject replaceObject(ChampObject object)
469       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
470
471     return replaceObject(object, Optional.empty());
472   }
473
474   @Override
475   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
476       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
477
478     ChampObject replacedObject = executeReplaceObject(object, transaction);
479
480     if(replacedObject != null) {
481
482       logOrEnqueueEvent(ChampEvent.builder()
483                                   .operation(ChampOperation.REPLACE)
484                                   .entity(replacedObject)
485                                   .build(),
486                         transaction);
487     }
488
489     return replacedObject;
490   }
491
492   @Override
493   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
494     deleteObject(key, Optional.empty());
495   }
496
497   @Override
498   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
499
500     // Retrieve the object that we are deleting before it's gone, so that we can
501     // report it to the event stream.
502     Optional<ChampObject> objectToDelete = Optional.empty();
503     try {
504       objectToDelete = retrieveObject(key, transaction);
505
506     } catch (ChampUnmarshallingException e) {
507       logger.error("Unable to generate delete object log: " + e.getMessage());
508     }
509
510     executeDeleteObject(key, transaction);
511
512     if(objectToDelete.isPresent()) {
513       // Update the event stream with the current operation.
514       logOrEnqueueEvent(ChampEvent.builder()
515                                   .operation(ChampOperation.DELETE)
516                                   .entity(objectToDelete.get())
517                                   .build(),
518                         transaction);
519     }
520   }
521
522   @Override
523   public ChampRelationship storeRelationship(ChampRelationship relationship)
524       throws ChampUnmarshallingException,
525              ChampMarshallingException,
526              ChampObjectNotExistsException,
527              ChampSchemaViolationException,
528              ChampRelationshipNotExistsException, ChampTransactionException {
529       return storeRelationship(relationship, Optional.empty());
530   }
531
532   @Override
533   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
534       throws ChampUnmarshallingException,
535              ChampMarshallingException,
536              ChampObjectNotExistsException,
537              ChampSchemaViolationException,
538              ChampRelationshipNotExistsException, ChampTransactionException {
539
540     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
541
542     if(storedRelationship != null) {
543
544       // Update the event stream with the current operation.
545       logOrEnqueueEvent(ChampEvent.builder()
546                                   .operation(ChampOperation.STORE)
547                                   .entity(storedRelationship)
548                                   .build(),
549                         transaction);
550     }
551
552     return storedRelationship;
553   }
554
555   @Override
556   public ChampRelationship replaceRelationship(ChampRelationship relationship)
557       throws ChampUnmarshallingException,
558              ChampMarshallingException,
559              ChampSchemaViolationException,
560              ChampRelationshipNotExistsException, ChampTransactionException {
561     return replaceRelationship(relationship, Optional.empty());
562   }
563
564   @Override
565   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
566       throws ChampUnmarshallingException,
567              ChampMarshallingException,
568              ChampSchemaViolationException,
569              ChampRelationshipNotExistsException, ChampTransactionException {
570
571     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
572
573     if(replacedRelationship != null) {
574
575       // Update the event stream with the current operation.
576       logOrEnqueueEvent(ChampEvent.builder()
577                                   .operation(ChampOperation.REPLACE)
578                                   .entity(replacedRelationship)
579                                   .build(),
580                         transaction);
581     }
582
583     return replacedRelationship;
584   }
585
586   @Override
587   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
588     deleteRelationship(relationship, Optional.empty());
589   }
590
591   @Override
592   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
593
594     executeDeleteRelationship(relationship, transaction);
595
596     // Update the event stream with the current operation.
597     logOrEnqueueEvent(ChampEvent.builder()
598                                 .operation(ChampOperation.DELETE)
599                                 .entity(relationship)
600                                 .build(),
601                       transaction);
602   }
603
604   @Override
605   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
606     return storePartition(partition, Optional.empty());
607   }
608
609   @Override
610   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
611
612     ChampPartition storedPartition = executeStorePartition(partition, transaction);
613
614     if(storedPartition != null) {
615
616       // Update the event stream with the current operation.
617       logOrEnqueueEvent(ChampEvent.builder()
618                                   .operation(ChampOperation.STORE)
619                                   .entity(storedPartition)
620                                   .build(),
621                         transaction);
622     }
623
624     return storedPartition;
625   }
626
627   @Override
628   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
629     deletePartition(graph, Optional.empty());
630   }
631
632   @Override
633   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
634
635     executeDeletePartition(graph, transaction);
636
637     // Update the event stream with the current operation.
638     logOrEnqueueEvent(ChampEvent.builder()
639                                 .operation(ChampOperation.DELETE)
640                                 .entity(graph)
641                                 .build(),
642                       transaction);
643   }
644
645   @Override
646   public void storeObjectIndex(ChampObjectIndex index) {
647
648     executeStoreObjectIndex(index);
649
650     // Update the event stream with the current operation.
651     logEvent(ChampEvent.builder()
652                   .operation(ChampOperation.STORE)
653                   .entity(index)
654                   .build());
655   }
656
657
658   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
659
660     // Retrieve the index that we are deleting before it's gone, so that we can
661     // report it to the event stream.
662     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
663
664     executeDeleteObjectIndex(indexName);
665
666     if(indexToDelete.isPresent()) {
667       // Update the event stream with the current operation.
668       logEvent(ChampEvent.builder()
669                     .operation(ChampOperation.DELETE)
670                     .entity(indexToDelete.get())
671                     .build());
672     }
673   }
674
675
676   public void storeRelationshipIndex(ChampRelationshipIndex index) {
677
678     executeStoreRelationshipIndex(index);
679
680     // Update the event stream with the current operation.
681     logEvent(ChampEvent.builder()
682                   .operation(ChampOperation.STORE)
683                   .entity(index)
684                   .build());
685   }
686
687
688   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
689
690     // Retrieve the index that we are deleting before it's gone, so that we can
691     // report it to the event stream.
692     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
693
694     executeDeleteRelationshipIndex(indexName);
695
696     if(indexToDelete.isPresent()) {
697       // Update the event stream with the current operation.
698       logEvent(ChampEvent.builder()
699                     .operation(ChampOperation.DELETE)
700                     .entity(indexToDelete.get())
701                     .build());
702     }
703   }
704
705   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
706
707     if(!transaction.isPresent()) {
708       // Update the event stream with the current operation.
709       logEvent(event);
710     } else {
711
712       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
713       event.setDbTransactionId ( transaction.get ().id () );
714       transaction.get().logEvent(event);
715     }
716   }
717
718   /**
719    * Submits an event to be published to the event stream.
720    *
721    * @param anEvent - The event to be published.
722    */
723   public void logEvent(ChampEvent anEvent) {
724
725     if(eventQueue == null) {
726       return;
727     }
728
729     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
730     if(logger.isDebugEnabled()) {
731       logger.debug("Event payload: " + anEvent.toString());
732     }
733
734     // Try to submit the event to be published to the event bus.
735     if(!eventQueue.offer(anEvent)) {
736       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
737     }
738   }
739
740
741   /**
742    * This class implements the worker threads for our thread pool which are responsible for
743    * pulling the next outgoing event from the internal buffer and forwarding them to the event
744    * bus client.
745    * <p>
746    * Each publish operation is performed synchronously, so that the thread will only move on
747    * to the next available event once it has actually published the current event to the bus.
748    */
749   private class EventPublisherWorker implements Runnable {
750
751     /** Partition key to use when publishing events to the event stream.  We WANT all events
752      *  to go to a single partition, so we are just using a hard-coded key for every event. */
753     private static final String EVENTS_PARTITION_KEY = "champEventKey";
754
755
756     @Override
757     public void run() {
758
759       while(true) {
760         ChampEvent event = null;
761         try {
762
763           // Get the next event to be published from the queue.
764           event = eventQueue.take();
765
766         } catch (InterruptedException e) {
767
768           // Restore the interrupted status.
769           Thread.currentThread().interrupt();
770         }
771
772         // Create new envelope containing an event header and ChampEvent
773         ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
774
775         // Try publishing the event to the event bus.  This call will block until
776         try {
777           producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
778
779         } catch (Exception e) {
780
781           logger.error("Failed to publish event to event bus: " + e.getMessage());
782         }
783       }
784     }
785   }
786 }