Merge "Added sonar fixes"
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60
61 /**
62  * This class provides the hooks to allow Champ operations to be logged to an event
63  * stream.
64  */
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
66
67   public static final String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68   public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71   public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
72   protected static final String KEY_PROPERTY_NAME = "aai-uuid";
73   protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
74
75   /** Pool of worker threads that do the work of publishing the events to the event bus. */
76   protected ThreadPoolExecutor publisherPool;
77
78   /** Client used for publishing events to the event bus. */
79   protected EventPublisher producer;
80
81   /** Internal queue where outgoing events will be buffered until they can be serviced by
82    *  the event publisher worker threads. */
83   protected BlockingQueue<ChampEvent> eventQueue;
84
85   /** Number of events that can be queued up for publication before we begin dropping
86    *  events. */
87   private Integer eventQueueCapacity;
88
89   /** Number of event publisher worker threads. */
90   private Integer eventStreamPublisherPoolSize;
91
92   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
93
94
95   /**
96    * Create a new instance of the AbstractLoggingChampGraph.
97    *
98    * @param properties - Set of configuration properties for this graph instance.
99    */
100   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
101
102     // Extract the necessary parameters from the configuration properties.
103     configure(properties);
104
105     // Make sure we were passed an event producer as one of our properties, otherwise
106     // there is really nothing more we can do...
107     if(producer == null) {
108       logger.error("No event stream producer was supplied.");
109       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
110       return;
111     }
112
113     // Create the blocking queue that we will use to buffer events that we want
114     // published to the event bus.
115     eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
116
117     // Create the executor pool that will do the work of publishing events to the event bus.
118     publisherPool =
119         (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
120             new ProducerWorkerThreadFactory());
121
122     try {
123
124       // Start up the producer worker threads.
125       for(int i=0; i<eventStreamPublisherPoolSize; i++) {
126         publisherPool.submit(new EventPublisherWorker());
127       }
128
129     } catch (Exception e) {
130
131       logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
132       logger.error("NOTE!! Champ events may NOT be published to the event stream!");
133       return;
134     }
135   }
136
137
138
139   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
140   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
141   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
142   public abstract Stream<ChampObject>         queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
143   @Override
144   public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
145   @Override
146   public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
147   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
148   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
149   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
150
151   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
152
153   /**
154     * Creates or updates a vertex in the graph data store.
155     * <p>
156     * If a transaction context is not provided, then a transaction will be automatically
157     * created and committed for this operation only, otherwise, the supplied transaction
158     * will be used and it will be up to the caller to commit the transaction at its
159     * discretion.
160     *
161     * @param object      - The vertex to be created or updated.
162     * @param transaction - Optional transaction context to perform the operation in.
163     *
164     * @return - The vertex, as created, marshaled as a {@link ChampObject}
165     *
166     * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled
167     *                                         into the backend representation
168     * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
169     *                                         by {@link ChampGraph#retrieveSchema}
170     * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
171     *                                         is not present or object not found in the graph
172     * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
173     */
174   public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
175
176   /**
177    * Updates an existing vertex in the graph store.
178    * <p>
179    * If a transaction context is not provided, then a transaction will be automatically
180    * created and committed for this operation only, otherwise, the supplied transaction
181    * will be used and it will be up to the caller to commit the transaction at its
182    * discretion.
183    *
184    * @param object      - The vertex to be created or updated.
185    * @param transaction - Optional transaction context to perform the operation in.
186    *
187    * @return - The updated vertex, marshaled as a {@link ChampObject}
188    *
189    * @throws ChampMarshallingException     - If the {@code object} is not able to be marshalled into
190    *                                         the backend representation
191    * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
192    *                                         by {@link ChampGraph#retrieveSchema}
193    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
194    *                                         is not present or object not found in the graph
195    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
196    */
197   public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
198
199   /**
200    * Deletes an existing vertex from the graph store.
201    * <p>
202    * If a transaction context is not provided, then a transaction will be automatically
203    * created and committed for this operation only, otherwise, the supplied transaction
204    * will be used and it will be up to the caller to commit the transaction at its
205    * discretion.
206    *
207    * @param key         - The key of the ChampObject in the graph {@link ChampObject#getKey}
208    * @param transaction - Optional transaction context to perform the operation in.
209    *
210    * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
211    *                                         is not present or object not found in the graph
212    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
213    */
214   public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
215
216   /**
217    * Creates or updates an edge in the graph data store.
218    * <p>
219    * If a transaction context is not provided, then a transaction will be automatically
220    * created and committed for this operation only, otherwise, the supplied transaction
221    * will be used and it will be up to the caller to commit the transaction at its
222    * discretion.
223    *
224    * @param relationship - The ChampRelationship that you wish to store in the graph
225    * @param transaction  - Optional transaction context to perform the operation in.
226    *
227    * @return - The {@link ChampRelationship} as it was stored.
228    *
229    * @throws ChampUnmarshallingException         - If the edge which was created could not be
230    *                                               unmarshalled into a ChampRelationship
231    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
232    *                                               marshalled into the backend representation
233    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
234    *                                               by this relationship does not exist in the graph
235    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
236    *                                               specifed by {@link ChampGraph#retrieveSchema}
237    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
238    *                                               but the object cannot be found in the graph
239    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
240    */
241   public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
242
243   /**
244    * Replaces an existing edge in the graph data store.
245    * <p>
246    * If a transaction context is not provided, then a transaction will be automatically
247    * created and committed for this operation only, otherwise, the supplied transaction
248    * will be used and it will be up to the caller to commit the transaction at its
249    * discretion.
250    *
251    * @param relationship  - The ChampRelationship that you wish to replace in the graph
252    * @param transaction   - Optional transaction context to perform the operation in.
253    *
254    * @return - The {@link ChampRelationship} as it was stored.
255    *
256    * @throws ChampUnmarshallingException         - If the edge which was created could not be
257    *                                               unmarshalled into a ChampRelationship
258    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
259    *                                               marshalled into the backend representation
260    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
261    *                                               specifed by {@link ChampGraph#retrieveSchema}
262    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
263    *                                               but the object cannot be found in the graph
264    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
265    */
266   public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
267
268   /**
269    * Removes an edge from the graph data store.
270    * <p>
271    * If a transaction context is not provided, then a transaction will be automatically
272    * created and committed for this operation only, otherwise, the supplied transaction
273    * will be used and it will be up to the caller to commit the transaction at its
274    * discretion.
275    *
276    * @param relationship - The ChampRelationship that you wish to remove from the graph.
277    * @param transaction  - Optional transaction context to perform the operation in.
278    *
279    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
280    *                                               but the object cannot be found in the graph
281    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
282    */
283   public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
284
285   /**
286    * Create or update a {@link ChampPartition}.
287    * <p>
288    * If a transaction context is not provided, then a transaction will be automatically
289    * created and committed for this operation only, otherwise, the supplied transaction
290    * will be used and it will be up to the caller to commit the transaction at its
291    * discretion.
292    *
293    * @param partition   - The ChampPartition that you wish to create or update in the graph.
294    * @param transaction - Optional transaction context to perform the operation in.
295    *
296    * @return - The {@link ChampPartition} as it was stored.
297    *
298    * @throws ChampSchemaViolationException       - If the {@code relationship} violates the constraints
299    *                                               specifed by {@link ChampGraph#retrieveSchema}
300    * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
301    *                                               but the object cannot be found in the graph
302    * @throws ChampMarshallingException           - If the {@code relationship} is not able to be
303    *                                               marshalled into the backend representation
304    * @throws ChampObjectNotExistsException       - If either the source or target object referenced
305    *                                               by this relationship does not exist in the graph
306    * @throws ChampTransactionException           - If an attempt to commit or rollback the transaction failed.
307    */
308   public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
309
310   /**
311    * Removes a partition from the graph.
312    * <p>
313    * If a transaction context is not provided, then a transaction will be automatically
314    * created and committed for this operation only, otherwise, the supplied transaction
315    * will be used and it will be up to the caller to commit the transaction at its
316    * discretion.
317    *
318    * @param graph       - The partition to be removed.
319    * @param transaction - Optional transaction context to perform the operation in.
320    *
321    * @throws ChampTransactionException     - If an attempt to commit or rollback the transaction failed.
322    */
323   public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
324
325   /**
326    * Create or update an object index in the graph.
327    *
328    * @param index       - The object index to be created/updated.
329    */
330   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
331   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
332   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
333   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
334   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
335   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
336   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
337   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
338   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
339   public abstract ChampSchema                      retrieveSchema();
340   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
341   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
342   public abstract void                             deleteSchema();
343   public abstract ChampCapabilities                capabilities();
344
345
346   /**
347    * Thread factory for the event producer workers.
348    */
349   private class ProducerWorkerThreadFactory implements ThreadFactory {
350
351     private AtomicInteger threadNumber = new AtomicInteger(1);
352
353     public Thread newThread(Runnable r) {
354       return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
355     }
356   }
357
358
359   /**
360    * Process the configuration properties supplied for this graph instance.
361    *
362    * @param properties - Configuration parameters.
363    */
364   private void configure(Map<String, Object> properties) {
365
366     producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
367
368     eventQueueCapacity =
369         (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
370     eventStreamPublisherPoolSize =
371         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
372   }
373
374
375   public void setProducer(EventPublisher aProducer) {
376
377     producer = aProducer;
378   }
379
380   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
381
382     if(properties.containsKey(property)) {
383       return properties.get(property);
384     } else {
385       return defaultValue;
386     }
387   }
388
389   @Override
390   public void shutdown() {
391
392     if(publisherPool != null) {
393       publisherPool.shutdown();
394
395       try {
396         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
397       } catch (InterruptedException e) {
398         logger.warn("Termination interrupted");
399         Thread.currentThread().interrupt();
400       }
401     }
402
403     if(producer != null) {
404
405       try {
406         producer.close();
407
408       } catch (Exception e) {
409         logger.error("Failed to stop event stream producer: " + e.getMessage());
410       }
411     }
412   }
413
414   @Override
415   public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
416
417     try {
418
419       // Commit the transaction.
420       transaction.commit();
421
422     } catch (ChampTransactionException e) {
423
424       logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
425
426       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
427       for(ChampEvent event : enqueuedEvents) {
428
429         logger.debug("Graph event " + event.toString() + " not published.");
430       }
431       throw e;
432     }
433
434     // Now that the transaction has been successfully committed, we need
435     // to log the events that were produced within that transaction's
436     // context.
437     List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
438     for(ChampEvent event : enqueuedEvents) {
439       logEvent(event);
440     }
441   }
442
443   @Override
444   public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
445
446     // Rollback the transaction.
447     transaction.rollback();
448   }
449
450   @Override
451   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
452     return storeObject(object, Optional.empty());
453   }
454
455   @Override
456   public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
457
458     ChampObject storedObject = executeStoreObject(object, transaction);
459
460     if(storedObject != null) {
461
462       logOrEnqueueEvent(ChampEvent.builder()
463                                     .operation(ChampOperation.STORE)
464                                     .entity(storedObject)
465                                     .build(),
466                         transaction);
467     }
468
469     return storedObject;
470   }
471
472   @Override
473   public ChampObject replaceObject(ChampObject object)
474       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
475
476     return replaceObject(object, Optional.empty());
477   }
478
479   @Override
480   public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
481       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
482
483     ChampObject replacedObject = executeReplaceObject(object, transaction);
484
485     if(replacedObject != null) {
486
487       logOrEnqueueEvent(ChampEvent.builder()
488                                   .operation(ChampOperation.REPLACE)
489                                   .entity(replacedObject)
490                                   .build(),
491                         transaction);
492     }
493
494     return replacedObject;
495   }
496
497   @Override
498   public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
499     deleteObject(key, Optional.empty());
500   }
501
502   @Override
503   public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
504
505     // Retrieve the object that we are deleting before it's gone, so that we can
506     // report it to the event stream.
507     Optional<ChampObject> objectToDelete = Optional.empty();
508     try {
509       objectToDelete = retrieveObject(key, transaction);
510
511     } catch (ChampUnmarshallingException e) {
512       logger.error("Unable to generate delete object log: " + e.getMessage());
513     }
514
515     executeDeleteObject(key, transaction);
516
517     if(objectToDelete.isPresent()) {
518       // Update the event stream with the current operation.
519       logOrEnqueueEvent(ChampEvent.builder()
520                                   .operation(ChampOperation.DELETE)
521                                   .entity(objectToDelete.get())
522                                   .build(),
523                         transaction);
524     }
525   }
526
527   @Override
528   public ChampRelationship storeRelationship(ChampRelationship relationship)
529       throws ChampUnmarshallingException,
530              ChampMarshallingException,
531              ChampObjectNotExistsException,
532              ChampSchemaViolationException,
533              ChampRelationshipNotExistsException, ChampTransactionException {
534       return storeRelationship(relationship, Optional.empty());
535   }
536
537   @Override
538   public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
539       throws ChampUnmarshallingException,
540              ChampMarshallingException,
541              ChampObjectNotExistsException,
542              ChampSchemaViolationException,
543              ChampRelationshipNotExistsException, ChampTransactionException {
544
545     ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
546
547     if(storedRelationship != null) {
548
549       // Update the event stream with the current operation.
550       logOrEnqueueEvent(ChampEvent.builder()
551                                   .operation(ChampOperation.STORE)
552                                   .entity(storedRelationship)
553                                   .build(),
554                         transaction);
555     }
556
557     return storedRelationship;
558   }
559
560   @Override
561   public ChampRelationship replaceRelationship(ChampRelationship relationship)
562       throws ChampUnmarshallingException,
563              ChampMarshallingException,
564              ChampSchemaViolationException,
565              ChampRelationshipNotExistsException, ChampTransactionException {
566     return replaceRelationship(relationship, Optional.empty());
567   }
568
569   @Override
570   public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
571       throws ChampUnmarshallingException,
572              ChampMarshallingException,
573              ChampSchemaViolationException,
574              ChampRelationshipNotExistsException, ChampTransactionException {
575
576     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
577
578     if(replacedRelationship != null) {
579
580       // Update the event stream with the current operation.
581       logOrEnqueueEvent(ChampEvent.builder()
582                                   .operation(ChampOperation.REPLACE)
583                                   .entity(replacedRelationship)
584                                   .build(),
585                         transaction);
586     }
587
588     return replacedRelationship;
589   }
590
591   @Override
592   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
593     deleteRelationship(relationship, Optional.empty());
594   }
595
596   @Override
597   public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
598
599     executeDeleteRelationship(relationship, transaction);
600
601     // Update the event stream with the current operation.
602     logOrEnqueueEvent(ChampEvent.builder()
603                                 .operation(ChampOperation.DELETE)
604                                 .entity(relationship)
605                                 .build(),
606                       transaction);
607   }
608
609   @Override
610   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
611     return storePartition(partition, Optional.empty());
612   }
613
614   @Override
615   public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
616
617     ChampPartition storedPartition = executeStorePartition(partition, transaction);
618
619     if(storedPartition != null) {
620
621       // Update the event stream with the current operation.
622       logOrEnqueueEvent(ChampEvent.builder()
623                                   .operation(ChampOperation.STORE)
624                                   .entity(storedPartition)
625                                   .build(),
626                         transaction);
627     }
628
629     return storedPartition;
630   }
631
632   @Override
633   public void deletePartition(ChampPartition graph) throws ChampTransactionException{
634     deletePartition(graph, Optional.empty());
635   }
636
637   @Override
638   public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
639
640     executeDeletePartition(graph, transaction);
641
642     // Update the event stream with the current operation.
643     logOrEnqueueEvent(ChampEvent.builder()
644                                 .operation(ChampOperation.DELETE)
645                                 .entity(graph)
646                                 .build(),
647                       transaction);
648   }
649
650   @Override
651   public void storeObjectIndex(ChampObjectIndex index) {
652
653     executeStoreObjectIndex(index);
654
655     // Update the event stream with the current operation.
656     logEvent(ChampEvent.builder()
657                   .operation(ChampOperation.STORE)
658                   .entity(index)
659                   .build());
660   }
661
662
663   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
664
665     // Retrieve the index that we are deleting before it's gone, so that we can
666     // report it to the event stream.
667     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
668
669     executeDeleteObjectIndex(indexName);
670
671     if(indexToDelete.isPresent()) {
672       // Update the event stream with the current operation.
673       logEvent(ChampEvent.builder()
674                     .operation(ChampOperation.DELETE)
675                     .entity(indexToDelete.get())
676                     .build());
677     }
678   }
679
680
681   public void storeRelationshipIndex(ChampRelationshipIndex index) {
682
683     executeStoreRelationshipIndex(index);
684
685     // Update the event stream with the current operation.
686     logEvent(ChampEvent.builder()
687                   .operation(ChampOperation.STORE)
688                   .entity(index)
689                   .build());
690   }
691
692
693   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
694
695     // Retrieve the index that we are deleting before it's gone, so that we can
696     // report it to the event stream.
697     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
698
699     executeDeleteRelationshipIndex(indexName);
700
701     if(indexToDelete.isPresent()) {
702       // Update the event stream with the current operation.
703       logEvent(ChampEvent.builder()
704                     .operation(ChampOperation.DELETE)
705                     .entity(indexToDelete.get())
706                     .build());
707     }
708   }
709
710   private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
711
712     if(!transaction.isPresent()) {
713       // Update the event stream with the current operation.
714       logEvent(event);
715     } else {
716
717       // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
718       event.setDbTransactionId ( transaction.get ().id () );
719       transaction.get().logEvent(event);
720     }
721   }
722
723   /**
724    * Submits an event to be published to the event stream.
725    *
726    * @param anEvent - The event to be published.
727    */
728   public void logEvent(ChampEvent anEvent) {
729
730     if(eventQueue == null) {
731       return;
732     }
733
734     logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
735     if(logger.isDebugEnabled()) {
736       logger.debug("Event payload: " + anEvent.toString());
737     }
738
739     // Try to submit the event to be published to the event bus.
740     if(!eventQueue.offer(anEvent)) {
741       logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
742     }
743   }
744
745
746   /**
747    * This class implements the worker threads for our thread pool which are responsible for
748    * pulling the next outgoing event from the internal buffer and forwarding them to the event
749    * bus client.
750    * <p>
751    * Each publish operation is performed synchronously, so that the thread will only move on
752    * to the next available event once it has actually published the current event to the bus.
753    */
754   private class EventPublisherWorker implements Runnable {
755
756     /** Partition key to use when publishing events to the event stream.  We WANT all events
757      *  to go to a single partition, so we are just using a hard-coded key for every event. */
758     private static final String EVENTS_PARTITION_KEY = "champEventKey";
759
760
761     @Override
762     public void run() {
763
764       while(true) {
765         ChampEvent event = null;
766         try {
767
768           // Get the next event to be published from the queue.
769           event = eventQueue.take();
770
771         } catch (InterruptedException e) {
772
773           // Restore the interrupted status.
774           Thread.currentThread().interrupt();
775         }
776
777         // Create new envelope containing an event header and ChampEvent
778         ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
779
780         // Try publishing the event to the event bus.  This call will block until
781         try {
782           producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
783
784         } catch (Exception e) {
785
786           logger.error("Failed to publish event to event bus: " + e.getMessage());
787         }
788       }
789     }
790   }
791 }