Merge "Fix handling of InterruptedException in AAI-Champ"
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60
61 /**
62  * This class provides the hooks to allow Champ operations to be logged to an event
63  * stream.
64  */
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
66
67   public static final String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68   public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71   public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
72
73   /** Pool of worker threads that do the work of publishing the events to the event bus. */
74   protected ThreadPoolExecutor publisherPool;
75
76   /** Client used for publishing events to the event bus. */
77   protected EventPublisher producer;
78
79   /** Internal queue where outgoing events will be buffered until they can be serviced by
80    *  the event publisher worker threads. */
81   protected BlockingQueue<ChampEvent> eventQueue;
82
83   /** Number of events that can be queued up for publication before we begin dropping
84    *  events. */
85   private Integer eventQueueCapacity;
86
87   /** Number of event publisher worker threads. */
88   private Integer eventStreamPublisherPoolSize;
89
90   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
91
92
93   /**
94    * Create a new instance of the AbstractLoggingChampGraph.
95    *
96    * @param properties - Set of configuration properties for this graph instance.
97    */
98   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
99
100     // Extract the necessary parameters from the configuration properties.
101     configure(properties);
102
103     // Make sure we were passed an event producer as one of our properties, otherwise
104     // there is really nothing more we can do...
105     if(producer == null) {
106       logger.error("No event stream producer was supplied.");
107       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
108       return;
109     }
110
111     // Create the blocking queue that we will use to buffer events that we want
112     // published to the event bus.
113     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
114
115     // Create the executor pool that will do the work of publishing events to the event bus.
116     publisherPool =
117         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
118             new ProducerWorkerThreadFactory());
119
120     try {
121
122       // Start up the producer worker threads.
123       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
124         publisherPool.submit(new EventPublisherWorker());
125       }
126
127     } catch (Exception e) {
128
129       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
130       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
131       return;
132     }
133   }
134
135
136
137   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
138   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
139   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
140   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
141   @Override
142   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
143   @Override
144   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
145   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
146   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
147   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
148
149   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
150
151   /**
152     * Creates or updates a vertex in the graph data store.
153     * <p>
154     * If a transaction context is not provided, then a transaction will be automatically
155     * created and committed for this operation only, otherwise, the supplied transaction
156     * will be used and it will be up to the caller to commit the transaction at its
157     * discretion.
158     *
159     * @param object      - The vertex to be created or updated.
160     * @param transaction - Optional transaction context to perform the operation in.
161     *
162     * @return - The vertex, as created, marshaled as a {@link ChampObject}
163     *
164     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
165     *                                         into the backend representation
166     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
167     *                                         by {@link ChampGraph#retrieveSchema}
168     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
169     *                                         is not present or object not found in the graph
170     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
171     */
172   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
173
174   /**
175    * Updates an existing vertex in the graph store.
176    * <p>
177    * If a transaction context is not provided, then a transaction will be automatically
178    * created and committed for this operation only, otherwise, the supplied transaction
179    * will be used and it will be up to the caller to commit the transaction at its
180    * discretion.
181    *
182    * @param object      - The vertex to be created or updated.
183    * @param transaction - Optional transaction context to perform the operation in.
184    *
185    * @return - The updated vertex, marshaled as a {@link ChampObject}
186    *
187    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
188    *                                         the backend representation
189    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
190    *                                         by {@link ChampGraph#retrieveSchema}
191    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
192    *                                         is not present or object not found in the graph
193    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
194    */
195   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
196
197   /**
198    * Deletes an existing vertex from the graph store.
199    * <p>
200    * If a transaction context is not provided, then a transaction will be automatically
201    * created and committed for this operation only, otherwise, the supplied transaction
202    * will be used and it will be up to the caller to commit the transaction at its
203    * discretion.
204    *
205    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
206    * @param transaction - Optional transaction context to perform the operation in.
207    *
208    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
209    *                                         is not present or object not found in the graph
210    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
211    */
212   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
213
214   /**
215    * Creates or updates an edge in the graph data store.
216    * <p>
217    * If a transaction context is not provided, then a transaction will be automatically
218    * created and committed for this operation only, otherwise, the supplied transaction
219    * will be used and it will be up to the caller to commit the transaction at its
220    * discretion.
221    *
222    * @param relationship - The ChampRelationship that you wish to store in the graph
223    * @param transaction  - Optional transaction context to perform the operation in.
224    *
225    * @return - The {@link ChampRelationship} as it was stored.
226    *
227    * @throws ChampUnmarshallingException         - If the edge which was created could not be
228    *                                               unmarshalled into a ChampRelationship
229    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
230    *                                               marshalled into the backend representation
231    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
232    *                                               by this relationship does not exist in the graph
233    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
234    *                                               specifed by {@link ChampGraph#retrieveSchema}
235    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
236    *                                               but the object cannot be found in the graph
237    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
238    */
239   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
240
241   /**
242    * Replaces an existing edge in the graph data store.
243    * <p>
244    * If a transaction context is not provided, then a transaction will be automatically
245    * created and committed for this operation only, otherwise, the supplied transaction
246    * will be used and it will be up to the caller to commit the transaction at its
247    * discretion.
248    *
249    * @param relationship  - The ChampRelationship that you wish to replace in the graph
250    * @param transaction   - Optional transaction context to perform the operation in.
251    *
252    * @return - The {@link ChampRelationship} as it was stored.
253    *
254    * @throws ChampUnmarshallingException         - If the edge which was created could not be
255    *                                               unmarshalled into a ChampRelationship
256    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
257    *                                               marshalled into the backend representation
258    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
259    *                                               specifed by {@link ChampGraph#retrieveSchema}
260    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
261    *                                               but the object cannot be found in the graph
262    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
263    */
264   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
265
266   /**
267    * Removes an edge from the graph data store.
268    * <p>
269    * If a transaction context is not provided, then a transaction will be automatically
270    * created and committed for this operation only, otherwise, the supplied transaction
271    * will be used and it will be up to the caller to commit the transaction at its
272    * discretion.
273    *
274    * @param relationship - The ChampRelationship that you wish to remove from the graph.
275    * @param transaction  - Optional transaction context to perform the operation in.
276    *
277    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
278    *                                               but the object cannot be found in the graph
279    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
280    */
281   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
282
283   /**
284    * Create or update a {@link ChampPartition}.
285    * <p>
286    * If a transaction context is not provided, then a transaction will be automatically
287    * created and committed for this operation only, otherwise, the supplied transaction
288    * will be used and it will be up to the caller to commit the transaction at its
289    * discretion.
290    *
291    * @param partition   - The ChampPartition that you wish to create or update in the graph.
292    * @param transaction - Optional transaction context to perform the operation in.
293    *
294    * @return - The {@link ChampPartition} as it was stored.
295    *
296    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
297    *                                               specifed by {@link ChampGraph#retrieveSchema}
298    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
299    *                                               but the object cannot be found in the graph
300    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
301    *                                               marshalled into the backend representation
302    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
303    *                                               by this relationship does not exist in the graph
304    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
305    */
306   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
307
308   /**
309    * Removes a partition from the graph.
310    * <p>
311    * If a transaction context is not provided, then a transaction will be automatically
312    * created and committed for this operation only, otherwise, the supplied transaction
313    * will be used and it will be up to the caller to commit the transaction at its
314    * discretion.
315    *
316    * @param graph       - The partition to be removed.
317    * @param transaction - Optional transaction context to perform the operation in.
318    *
319    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
320    */
321   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
322
323   /**
324    * Create or update an object index in the graph.
325    *
326    * @param index       - The object index to be created/updated.
327    */
328   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
329   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
330   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
331   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
332   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
333   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
334   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
335   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
336   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
337   public abstract ChampSchema                      retrieveSchema();
338   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
339   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
340   public abstract void                             deleteSchema();
341   public abstract ChampCapabilities                capabilities();
342
343
344   /**
345    * Thread factory for the event producer workers.
346    */
347   private class ProducerWorkerThreadFactory implements ThreadFactory {
348
349     private AtomicInteger threadNumber = new AtomicInteger(1);
350
351     public Thread newThread(Runnable r) {
352       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
353     }
354   }
355
356
357   /**
358    * Process the configuration properties supplied for this graph instance.
359    *
360    * @param properties - Configuration parameters.
361    */
362   private void configure(Map<String, Object> properties) {
363
364     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
365
366     eventQueueCapacity =
367         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
368     eventStreamPublisherPoolSize =
369         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
370   }
371
372
373   public void setProducer(EventPublisher aProducer) {
374
375     producer = aProducer;
376   }
377
378   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
379
380     if(properties.containsKey(property)) {
381       return properties.get(property);
382     } else {
383       return defaultValue;
384     }
385   }
386
387   @Override
388   public void shutdown() {
389
390     if(publisherPool != null) {
391       publisherPool.shutdown();
392
393       try {
394         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
395       } catch (InterruptedException e) {
396         logger.warn("Termination interrupted");
397         Thread.currentThread().interrupt();
398       }
399     }
400
401     if(producer != null) {
402
403       try {
404         producer.close();
405
406       } catch (Exception e) {
407         logger.error("Failed to stop event stream producer: " + e.getMessage());
408       }
409     }
410   }
411
412   @Override
413   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
414
415     try {
416
417       // Commit the transaction.
418       transaction.commit();
419
420     } catch (ChampTransactionException e) {
421
422       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
423
424       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
425       for(ChampEvent event : enqueuedEvents) {
426
427         logger.debug("Graph event " + event.toString() + " not published.");
428       }
429       throw e;
430     }
431
432     // Now that the transaction has been successfully committed, we need
433     // to log the events that were produced within that transaction's
434     // context.
435     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
436     for(ChampEvent event : enqueuedEvents) {
437       logEvent(event);
438     }
439   }
440
441   @Override
442   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
443
444     // Rollback the transaction.
445     transaction.rollback();
446   }
447
448   @Override
449   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
450     return storeObject(object, Optional.empty());
451   }
452
453   @Override
454   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
455
456     ChampObject storedObject = executeStoreObject(object, transaction);
457
458     if(storedObject != null) {
459
460       logOrEnqueueEvent(ChampEvent.builder()
461                                     .operation(ChampOperation.STORE)
462                                     .entity(storedObject)
463                                     .build(),
464                         transaction);
465     }
466
467     return storedObject;
468   }
469
470   @Override
471   public ChampObject replaceObject(ChampObject object)
472       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
473
474     return replaceObject(object, Optional.empty());
475   }
476
477   @Override
478   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
479       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
480
481     ChampObject replacedObject = executeReplaceObject(object, transaction);
482
483     if(replacedObject != null) {
484
485       logOrEnqueueEvent(ChampEvent.builder()
486                                   .operation(ChampOperation.REPLACE)
487                                   .entity(replacedObject)
488                                   .build(),
489                         transaction);
490     }
491
492     return replacedObject;
493   }
494
495   @Override
496   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
497     deleteObject(key, Optional.empty());
498   }
499
500   @Override
501   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
502
503     // Retrieve the object that we are deleting before it's gone, so that we can
504     // report it to the event stream.
505     Optional<ChampObject> objectToDelete = Optional.empty();
506     try {
507       objectToDelete = retrieveObject(key, transaction);
508
509     } catch (ChampUnmarshallingException e) {
510       logger.error("Unable to generate delete object log: " + e.getMessage());
511     }
512
513     executeDeleteObject(key, transaction);
514
515     if(objectToDelete.isPresent()) {
516       // Update the event stream with the current operation.
517       logOrEnqueueEvent(ChampEvent.builder()
518                                   .operation(ChampOperation.DELETE)
519                                   .entity(objectToDelete.get())
520                                   .build(),
521                         transaction);
522     }
523   }
524
525   @Override
526   public ChampRelationship storeRelationship(ChampRelationship relationship)
527       throws ChampUnmarshallingException,
528              ChampMarshallingException,
529              ChampObjectNotExistsException,
530              ChampSchemaViolationException,
531              ChampRelationshipNotExistsException, ChampTransactionException {
532       return storeRelationship(relationship, Optional.empty());
533   }
534
535   @Override
536   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
537       throws ChampUnmarshallingException,
538              ChampMarshallingException,
539              ChampObjectNotExistsException,
540              ChampSchemaViolationException,
541              ChampRelationshipNotExistsException, ChampTransactionException {
542
543     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
544
545     if(storedRelationship != null) {
546
547       // Update the event stream with the current operation.
548       logOrEnqueueEvent(ChampEvent.builder()
549                                   .operation(ChampOperation.STORE)
550                                   .entity(storedRelationship)
551                                   .build(),
552                         transaction);
553     }
554
555     return storedRelationship;
556   }
557
558   @Override
559   public ChampRelationship replaceRelationship(ChampRelationship relationship)
560       throws ChampUnmarshallingException,
561              ChampMarshallingException,
562              ChampSchemaViolationException,
563              ChampRelationshipNotExistsException, ChampTransactionException {
564     return replaceRelationship(relationship, Optional.empty());
565   }
566
567   @Override
568   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
569       throws ChampUnmarshallingException,
570              ChampMarshallingException,
571              ChampSchemaViolationException,
572              ChampRelationshipNotExistsException, ChampTransactionException {
573
574     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
575
576     if(replacedRelationship != null) {
577
578       // Update the event stream with the current operation.
579       logOrEnqueueEvent(ChampEvent.builder()
580                                   .operation(ChampOperation.REPLACE)
581                                   .entity(replacedRelationship)
582                                   .build(),
583                         transaction);
584     }
585
586     return replacedRelationship;
587   }
588
589   @Override
590   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
591     deleteRelationship(relationship, Optional.empty());
592   }
593
594   @Override
595   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
596
597     executeDeleteRelationship(relationship, transaction);
598
599     // Update the event stream with the current operation.
600     logOrEnqueueEvent(ChampEvent.builder()
601                                 .operation(ChampOperation.DELETE)
602                                 .entity(relationship)
603                                 .build(),
604                       transaction);
605   }
606
607   @Override
608   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
609     return storePartition(partition, Optional.empty());
610   }
611
612   @Override
613   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
614
615     ChampPartition storedPartition = executeStorePartition(partition, transaction);
616
617     if(storedPartition != null) {
618
619       // Update the event stream with the current operation.
620       logOrEnqueueEvent(ChampEvent.builder()
621                                   .operation(ChampOperation.STORE)
622                                   .entity(storedPartition)
623                                   .build(),
624                         transaction);
625     }
626
627     return storedPartition;
628   }
629
630   @Override
631   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
632     deletePartition(graph, Optional.empty());
633   }
634
635   @Override
636   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
637
638     executeDeletePartition(graph, transaction);
639
640     // Update the event stream with the current operation.
641     logOrEnqueueEvent(ChampEvent.builder()
642                                 .operation(ChampOperation.DELETE)
643                                 .entity(graph)
644                                 .build(),
645                       transaction);
646   }
647
648   @Override
649   public void storeObjectIndex(ChampObjectIndex index) {
650
651     executeStoreObjectIndex(index);
652
653     // Update the event stream with the current operation.
654     logEvent(ChampEvent.builder()
655                   .operation(ChampOperation.STORE)
656                   .entity(index)
657                   .build());
658   }
659
660
661   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
662
663     // Retrieve the index that we are deleting before it's gone, so that we can
664     // report it to the event stream.
665     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
666
667     executeDeleteObjectIndex(indexName);
668
669     if(indexToDelete.isPresent()) {
670       // Update the event stream with the current operation.
671       logEvent(ChampEvent.builder()
672                     .operation(ChampOperation.DELETE)
673                     .entity(indexToDelete.get())
674                     .build());
675     }
676   }
677
678
679   public void storeRelationshipIndex(ChampRelationshipIndex index) {
680
681     executeStoreRelationshipIndex(index);
682
683     // Update the event stream with the current operation.
684     logEvent(ChampEvent.builder()
685                   .operation(ChampOperation.STORE)
686                   .entity(index)
687                   .build());
688   }
689
690
691   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
692
693     // Retrieve the index that we are deleting before it's gone, so that we can
694     // report it to the event stream.
695     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
696
697     executeDeleteRelationshipIndex(indexName);
698
699     if(indexToDelete.isPresent()) {
700       // Update the event stream with the current operation.
701       logEvent(ChampEvent.builder()
702                     .operation(ChampOperation.DELETE)
703                     .entity(indexToDelete.get())
704                     .build());
705     }
706   }
707
708   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
709
710     if(!transaction.isPresent()) {
711       // Update the event stream with the current operation.
712       logEvent(event);
713     } else {
714
715       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
716       event.setDbTransactionId ( transaction.get ().id () );
717       transaction.get().logEvent(event);
718     }
719   }
720
721   /**
722    * Submits an event to be published to the event stream.
723    *
724    * @param anEvent - The event to be published.
725    */
726   public void logEvent(ChampEvent anEvent) {
727
728     if(eventQueue == null) {
729       return;
730     }
731
732     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
733     if(logger.isDebugEnabled()) {
734       logger.debug("Event payload: " + anEvent.toString());
735     }
736
737     // Try to submit the event to be published to the event bus.
738     if(!eventQueue.offer(anEvent)) {
739       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
740     }
741   }
742
743
744   /**
745    * This class implements the worker threads for our thread pool which are responsible for
746    * pulling the next outgoing event from the internal buffer and forwarding them to the event
747    * bus client.
748    * <p>
749    * Each publish operation is performed synchronously, so that the thread will only move on
750    * to the next available event once it has actually published the current event to the bus.
751    */
752   private class EventPublisherWorker implements Runnable {
753
754     /** Partition key to use when publishing events to the event stream.  We WANT all events
755      *  to go to a single partition, so we are just using a hard-coded key for every event. */
756     private static final String EVENTS_PARTITION_KEY = "champEventKey";
757
758
759     @Override
760     public void run() {
761
762       while(true) {
763         ChampEvent event = null;
764         try {
765
766           // Get the next event to be published from the queue.
767           event = eventQueue.take();
768
769         } catch (InterruptedException e) {
770
771           // Restore the interrupted status.
772           Thread.currentThread().interrupt();
773         }
774
775         // Create new envelope containing an event header and ChampEvent
776         ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
777
778         // Try publishing the event to the event bus.  This call will block until
779         try {
780           producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
781
782         } catch (Exception e) {
783
784           logger.error("Failed to publish event to event bus: " + e.getMessage());
785         }
786       }
787     }
788   }
789 }