Fix handling of InterruptedException in AAI-Champ
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.event.envelope.ChampEventHeader;
41 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
43 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
46 import org.onap.aai.champcore.exceptions.ChampTransactionException;
47 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPartition;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.onap.aai.event.api.EventPublisher;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60
61
62 /**
63  * This class provides the hooks to allow Champ operations to be logged to an event
64  * stream.
65  */
66 public abstract class AbstractLoggingChampGraph implements ChampGraph {
67
68   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
69
70   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
71   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
72   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
73   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
74   @Override
75   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
76   @Override
77   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
78   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
79   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
81   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
82
83
84   /**
85     * Creates or updates a vertex in the graph data store.
86     * <p>
87     * If a transaction context is not provided, then a transaction will be automatically
88     * created and committed for this operation only, otherwise, the supplied transaction
89     * will be used and it will be up to the caller to commit the transaction at its
90     * discretion.
91     *
92     * @param object      - The vertex to be created or updated.
93     * @param transaction - Optional transaction context to perform the operation in.
94     *
95     * @return - The vertex, as created, marshaled as a {@link ChampObject}
96     *
97     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
98     *                                         into the backend representation
99     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
100     *                                         by {@link ChampGraph#retrieveSchema}
101     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
102     *                                         is not present or object not found in the graph
103     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
104     */
105   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
106
107   /**
108    * Updates an existing vertex in the graph store.
109    * <p>
110    * If a transaction context is not provided, then a transaction will be automatically
111    * created and committed for this operation only, otherwise, the supplied transaction
112    * will be used and it will be up to the caller to commit the transaction at its
113    * discretion.
114    *
115    * @param object      - The vertex to be created or updated.
116    * @param transaction - Optional transaction context to perform the operation in.
117    *
118    * @return - The updated vertex, marshaled as a {@link ChampObject}
119    *
120    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
121    *                                         the backend representation
122    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
123    *                                         by {@link ChampGraph#retrieveSchema}
124    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
125    *                                         is not present or object not found in the graph
126    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
127    */
128   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
129
130   /**
131    * Deletes an existing vertex from the graph store.
132    * <p>
133    * If a transaction context is not provided, then a transaction will be automatically
134    * created and committed for this operation only, otherwise, the supplied transaction
135    * will be used and it will be up to the caller to commit the transaction at its
136    * discretion.
137    *
138    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
139    * @param transaction - Optional transaction context to perform the operation in.
140    *
141    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
142    *                                         is not present or object not found in the graph
143    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
144    */
145   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
146
147   /**
148    * Creates or updates an edge in the graph data store.
149    * <p>
150    * If a transaction context is not provided, then a transaction will be automatically
151    * created and committed for this operation only, otherwise, the supplied transaction
152    * will be used and it will be up to the caller to commit the transaction at its
153    * discretion.
154    *
155    * @param relationship - The ChampRelationship that you wish to store in the graph
156    * @param transaction  - Optional transaction context to perform the operation in.
157    *
158    * @return - The {@link ChampRelationship} as it was stored.
159    *
160    * @throws ChampUnmarshallingException         - If the edge which was created could not be
161    *                                               unmarshalled into a ChampRelationship
162    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
163    *                                               marshalled into the backend representation
164    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
165    *                                               by this relationship does not exist in the graph
166    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
167    *                                               specifed by {@link ChampGraph#retrieveSchema}
168    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
169    *                                               but the object cannot be found in the graph
170    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
171    */
172   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
173
174   /**
175    * Replaces an existing edge in the graph data store.
176    * <p>
177    * If a transaction context is not provided, then a transaction will be automatically
178    * created and committed for this operation only, otherwise, the supplied transaction
179    * will be used and it will be up to the caller to commit the transaction at its
180    * discretion.
181    *
182    * @param relationship  - The ChampRelationship that you wish to replace in the graph
183    * @param transaction   - Optional transaction context to perform the operation in.
184    *
185    * @return - The {@link ChampRelationship} as it was stored.
186    *
187    * @throws ChampUnmarshallingException         - If the edge which was created could not be
188    *                                               unmarshalled into a ChampRelationship
189    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
190    *                                               marshalled into the backend representation
191    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
192    *                                               specifed by {@link ChampGraph#retrieveSchema}
193    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
194    *                                               but the object cannot be found in the graph
195    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
196    */
197   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
198
199   /**
200    * Removes an edge from the graph data store.
201    * <p>
202    * If a transaction context is not provided, then a transaction will be automatically
203    * created and committed for this operation only, otherwise, the supplied transaction
204    * will be used and it will be up to the caller to commit the transaction at its
205    * discretion.
206    *
207    * @param relationship - The ChampRelationship that you wish to remove from the graph.
208    * @param transaction  - Optional transaction context to perform the operation in.
209    *
210    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
211    *                                               but the object cannot be found in the graph
212    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
213    */
214   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
215
216   /**
217    * Create or update a {@link ChampPartition}.
218    * <p>
219    * If a transaction context is not provided, then a transaction will be automatically
220    * created and committed for this operation only, otherwise, the supplied transaction
221    * will be used and it will be up to the caller to commit the transaction at its
222    * discretion.
223    *
224    * @param partition   - The ChampPartition that you wish to create or update in the graph.
225    * @param transaction - Optional transaction context to perform the operation in.
226    *
227    * @return - The {@link ChampPartition} as it was stored.
228    *
229    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
230    *                                               specifed by {@link ChampGraph#retrieveSchema}
231    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
232    *                                               but the object cannot be found in the graph
233    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
234    *                                               marshalled into the backend representation
235    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
236    *                                               by this relationship does not exist in the graph
237    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
238    */
239   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
240
241   /**
242    * Removes a partition from the graph.
243    * <p>
244    * If a transaction context is not provided, then a transaction will be automatically
245    * created and committed for this operation only, otherwise, the supplied transaction
246    * will be used and it will be up to the caller to commit the transaction at its
247    * discretion.
248    *
249    * @param graph       - The partition to be removed.
250    * @param transaction - Optional transaction context to perform the operation in.
251    *
252    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
253    */
254   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
255
256   /**
257    * Create or update an object index in the graph.
258    *
259    * @param index       - The object index to be created/updated.
260    */
261   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
262
263   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
264   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
265   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
266   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
267   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
268   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
269   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
270   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
271   public abstract ChampSchema                      retrieveSchema();
272   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
273   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
274   public abstract void                             deleteSchema();
275   public abstract ChampCapabilities                capabilities();
276
277
278
279   public final static String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
280   public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
281
282   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
283   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
284
285   public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
286
287
288
289   /** Number of events that can be queued up for publication before we begin dropping
290    *  events. */
291   private Integer eventQueueCapacity;
292
293   /** Number of event publisher worker threads. */
294   private Integer eventStreamPublisherPoolSize;
295
296   /** Pool of worker threads that do the work of publishing the events to the event bus. */
297   protected ThreadPoolExecutor publisherPool;
298
299   /** Client used for publishing events to the event bus. */
300   protected EventPublisher producer;
301
302   /** Internal queue where outgoing events will be buffered until they can be serviced by
303    *  the event publisher worker threads. */
304   protected BlockingQueue<ChampEvent> eventQueue;
305
306
307   /**
308    * Thread factory for the event producer workers.
309    */
310   private class ProducerWorkerThreadFactory implements ThreadFactory {
311
312     private AtomicInteger threadNumber = new AtomicInteger(1);
313
314     public Thread newThread(Runnable r) {
315       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
316     }
317   }
318
319
320   /**
321    * Create a new instance of the AbstractLoggingChampGraph.
322    *
323    * @param properties - Set of configuration properties for this graph instance.
324    */
325   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
326
327     // Extract the necessary parameters from the configuration properties.
328     configure(properties);
329
330     // Make sure we were passed an event producer as one of our properties, otherwise
331     // there is really nothing more we can do...
332     if(producer == null) {
333       logger.error("No event stream producer was supplied.");
334       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
335       return;
336     }
337
338     // Create the blocking queue that we will use to buffer events that we want
339     // published to the event bus.
340     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
341
342     // Create the executor pool that will do the work of publishing events to the event bus.
343     publisherPool =
344         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
345                                                           new ProducerWorkerThreadFactory());
346
347     try {
348
349       // Start up the producer worker threads.
350       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
351          publisherPool.submit(new EventPublisherWorker());
352       }
353
354     } catch (Exception e) {
355
356       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
357       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
358       return;
359     }
360   }
361
362
363   /**
364    * Process the configuration properties supplied for this graph instance.
365    *
366    * @param properties - Configuration parameters.
367    */
368   private void configure(Map<String, Object> properties) {
369
370     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
371
372     eventQueueCapacity =
373         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
374     eventStreamPublisherPoolSize =
375         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
376   }
377
378
379   public void setProducer(EventPublisher aProducer) {
380
381     producer = aProducer;
382   }
383
384   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
385
386     if(properties.containsKey(property)) {
387       return properties.get(property);
388     } else {
389       return defaultValue;
390     }
391   }
392
393   @Override
394   public void shutdown() {
395
396     if(publisherPool != null) {
397       publisherPool.shutdown();
398
399       try {
400         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
401       } catch (InterruptedException e) {
402         logger.warn("Termination interrupted");
403         Thread.currentThread().interrupt();
404       }
405     }
406
407     if(producer != null) {
408
409       try {
410         producer.close();
411
412       } catch (Exception e) {
413         logger.error("Failed to stop event stream producer: " + e.getMessage());
414       }
415     }
416   }
417
418   @Override
419   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
420
421     try {
422
423       // Commit the transaction.
424       transaction.commit();
425
426     } catch (ChampTransactionException e) {
427
428       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
429
430       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
431       for(ChampEvent event : enqueuedEvents) {
432
433         logger.debug("Graph event " + event.toString() + " not published.");
434       }
435       throw e;
436     }
437
438     // Now that the transaction has been successfully committed, we need
439     // to log the events that were produced within that transaction's
440     // context.
441     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
442     for(ChampEvent event : enqueuedEvents) {
443       logEvent(event);
444     }
445   }
446
447   @Override
448   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
449
450     // Rollback the transaction.
451     transaction.rollback();
452   }
453
454   @Override
455   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
456     return storeObject(object, Optional.empty());
457   }
458
459   @Override
460   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
461
462     ChampObject storedObject = executeStoreObject(object, transaction);
463
464     if(storedObject != null) {
465
466       logOrEnqueueEvent(ChampEvent.builder()
467                                     .operation(ChampOperation.STORE)
468                                     .entity(storedObject)
469                                     .build(),
470                         transaction);
471     }
472
473     return storedObject;
474   }
475
476   @Override
477   public ChampObject replaceObject(ChampObject object)
478       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
479
480     return replaceObject(object, Optional.empty());
481   }
482
483   @Override
484   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
485       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
486
487     ChampObject replacedObject = executeReplaceObject(object, transaction);
488
489     if(replacedObject != null) {
490
491       logOrEnqueueEvent(ChampEvent.builder()
492                                   .operation(ChampOperation.REPLACE)
493                                   .entity(replacedObject)
494                                   .build(),
495                         transaction);
496     }
497
498     return replacedObject;
499   }
500
501   @Override
502   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
503     deleteObject(key, Optional.empty());
504   }
505
506   @Override
507   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
508
509     // Retrieve the object that we are deleting before it's gone, so that we can
510     // report it to the event stream.
511     Optional<ChampObject> objectToDelete = Optional.empty();
512     try {
513       objectToDelete = retrieveObject(key, transaction);
514
515     } catch (ChampUnmarshallingException e) {
516       logger.error("Unable to generate delete object log: " + e.getMessage());
517     }
518
519     executeDeleteObject(key, transaction);
520
521     if(objectToDelete.isPresent()) {
522       // Update the event stream with the current operation.
523       logOrEnqueueEvent(ChampEvent.builder()
524                                   .operation(ChampOperation.DELETE)
525                                   .entity(objectToDelete.get())
526                                   .build(),
527                         transaction);
528     }
529   }
530
531   @Override
532   public ChampRelationship storeRelationship(ChampRelationship relationship)
533       throws ChampUnmarshallingException,
534              ChampMarshallingException,
535              ChampObjectNotExistsException,
536              ChampSchemaViolationException,
537              ChampRelationshipNotExistsException, ChampTransactionException {
538       return storeRelationship(relationship, Optional.empty());
539   }
540
541   @Override
542   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
543       throws ChampUnmarshallingException,
544              ChampMarshallingException,
545              ChampObjectNotExistsException,
546              ChampSchemaViolationException,
547              ChampRelationshipNotExistsException, ChampTransactionException {
548
549     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
550
551     if(storedRelationship != null) {
552
553       // Update the event stream with the current operation.
554       logOrEnqueueEvent(ChampEvent.builder()
555                                   .operation(ChampOperation.STORE)
556                                   .entity(storedRelationship)
557                                   .build(),
558                         transaction);
559     }
560
561     return storedRelationship;
562   }
563
564   @Override
565   public ChampRelationship replaceRelationship(ChampRelationship relationship)
566       throws ChampUnmarshallingException,
567              ChampMarshallingException,
568              ChampSchemaViolationException,
569              ChampRelationshipNotExistsException, ChampTransactionException {
570     return replaceRelationship(relationship, Optional.empty());
571   }
572
573   @Override
574   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
575       throws ChampUnmarshallingException,
576              ChampMarshallingException,
577              ChampSchemaViolationException,
578              ChampRelationshipNotExistsException, ChampTransactionException {
579
580     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
581
582     if(replacedRelationship != null) {
583
584       // Update the event stream with the current operation.
585       logOrEnqueueEvent(ChampEvent.builder()
586                                   .operation(ChampOperation.REPLACE)
587                                   .entity(replacedRelationship)
588                                   .build(),
589                         transaction);
590     }
591
592     return replacedRelationship;
593   }
594
595   @Override
596   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
597     deleteRelationship(relationship, Optional.empty());
598   }
599
600   @Override
601   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
602
603     executeDeleteRelationship(relationship, transaction);
604
605     // Update the event stream with the current operation.
606     logOrEnqueueEvent(ChampEvent.builder()
607                                 .operation(ChampOperation.DELETE)
608                                 .entity(relationship)
609                                 .build(),
610                       transaction);
611   }
612
613   @Override
614   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
615     return storePartition(partition, Optional.empty());
616   }
617
618   @Override
619   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
620
621     ChampPartition storedPartition = executeStorePartition(partition, transaction);
622
623     if(storedPartition != null) {
624
625       // Update the event stream with the current operation.
626       logOrEnqueueEvent(ChampEvent.builder()
627                                   .operation(ChampOperation.STORE)
628                                   .entity(storedPartition)
629                                   .build(),
630                         transaction);
631     }
632
633     return storedPartition;
634   }
635
636   @Override
637   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
638     deletePartition(graph, Optional.empty());
639   }
640
641   @Override
642   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
643
644     executeDeletePartition(graph, transaction);
645
646     // Update the event stream with the current operation.
647     logOrEnqueueEvent(ChampEvent.builder()
648                                 .operation(ChampOperation.DELETE)
649                                 .entity(graph)
650                                 .build(),
651                       transaction);
652   }
653
654   @Override
655   public void storeObjectIndex(ChampObjectIndex index) {
656
657     executeStoreObjectIndex(index);
658
659     // Update the event stream with the current operation.
660     logEvent(ChampEvent.builder()
661                   .operation(ChampOperation.STORE)
662                   .entity(index)
663                   .build());
664   }
665
666
667   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
668
669     // Retrieve the index that we are deleting before it's gone, so that we can
670     // report it to the event stream.
671     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
672
673     executeDeleteObjectIndex(indexName);
674
675     if(indexToDelete.isPresent()) {
676       // Update the event stream with the current operation.
677       logEvent(ChampEvent.builder()
678                     .operation(ChampOperation.DELETE)
679                     .entity(indexToDelete.get())
680                     .build());
681     }
682   }
683
684
685   public void storeRelationshipIndex(ChampRelationshipIndex index) {
686
687     executeStoreRelationshipIndex(index);
688
689     // Update the event stream with the current operation.
690     logEvent(ChampEvent.builder()
691                   .operation(ChampOperation.STORE)
692                   .entity(index)
693                   .build());
694   }
695
696
697   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
698
699     // Retrieve the index that we are deleting before it's gone, so that we can
700     // report it to the event stream.
701     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
702
703     executeDeleteRelationshipIndex(indexName);
704
705     if(indexToDelete.isPresent()) {
706       // Update the event stream with the current operation.
707       logEvent(ChampEvent.builder()
708                     .operation(ChampOperation.DELETE)
709                     .entity(indexToDelete.get())
710                     .build());
711     }
712   }
713
714   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
715
716     if(!transaction.isPresent()) {
717       // Update the event stream with the current operation.
718       logEvent(event);
719     } else {
720
721       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
722       event.setDbTransactionId ( transaction.get ().id () );
723       transaction.get().logEvent(event);
724     }
725   }
726
727   /**
728    * Submits an event to be published to the event stream.
729    *
730    * @param anEvent - The event to be published.
731    */
732   public void logEvent(ChampEvent anEvent) {
733
734     if(eventQueue == null) {
735       return;
736     }
737
738     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
739     if(logger.isDebugEnabled()) {
740       logger.debug("Event payload: " + anEvent.toString());
741     }
742
743     // Try to submit the event to be published to the event bus.
744     if(!eventQueue.offer(anEvent)) {
745       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
746     }
747   }
748
749
750   /**
751    * This class implements the worker threads for our thread pool which are responsible for
752    * pulling the next outgoing event from the internal buffer and forwarding them to the event
753    * bus client.
754    * <p>
755    * Each publish operation is performed synchronously, so that the thread will only move on
756    * to the next available event once it has actually published the current event to the bus.
757    */
758   private class EventPublisherWorker implements Runnable {
759
760     /** Partition key to use when publishing events to the event stream.  We WANT all events
761      *  to go to a single partition, so we are just using a hard-coded key for every event. */
762     private static final String EVENTS_PARTITION_KEY = "champEventKey";
763
764
765     @Override
766     public void run() {
767
768       while(true) {
769         ChampEvent event = null;
770         try {
771
772           // Get the next event to be published from the queue.
773           event = eventQueue.take();
774
775         } catch (InterruptedException e) {
776
777           // Restore the interrupted status.
778           Thread.currentThread().interrupt();
779         }
780
781         // Create new envelope containing an event header and ChampEvent
782         ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
783
784         // Try publishing the event to the event bus.  This call will block until
785         try {
786           producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
787
788         } catch (Exception e) {
789
790           logger.error("Failed to publish event to event bus: " + e.getMessage());
791         }
792       }
793     }
794   }
795 }