2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
62 * This class provides the hooks to allow Champ operations to be logged to an event
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
67 public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68 public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69 public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70 public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71 public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
73 /** Pool of worker threads that do the work of publishing the events to the event bus. */
74 protected ThreadPoolExecutor publisherPool;
76 /** Client used for publishing events to the event bus. */
77 protected EventPublisher producer;
79 /** Internal queue where outgoing events will be buffered until they can be serviced by
80 * the event publisher worker threads. */
81 protected BlockingQueue<ChampEvent> eventQueue;
83 /** Number of events that can be queued up for publication before we begin dropping
85 private Integer eventQueueCapacity;
87 /** Number of event publisher worker threads. */
88 private Integer eventStreamPublisherPoolSize;
90 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
94 * Create a new instance of the AbstractLoggingChampGraph.
96 * @param properties - Set of configuration properties for this graph instance.
98 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
100 // Extract the necessary parameters from the configuration properties.
101 configure(properties);
103 // Make sure we were passed an event producer as one of our properties, otherwise
104 // there is really nothing more we can do...
105 if(producer == null) {
106 logger.error("No event stream producer was supplied.");
107 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
111 // Create the blocking queue that we will use to buffer events that we want
112 // published to the event bus.
113 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
115 // Create the executor pool that will do the work of publishing events to the event bus.
117 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
118 new ProducerWorkerThreadFactory());
122 // Start up the producer worker threads.
123 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
124 publisherPool.submit(new EventPublisherWorker());
127 } catch (Exception e) {
129 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
130 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
137 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
138 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
139 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
140 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
142 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
144 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
145 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
146 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
147 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
149 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
152 * Creates or updates a vertex in the graph data store.
154 * If a transaction context is not provided, then a transaction will be automatically
155 * created and committed for this operation only, otherwise, the supplied transaction
156 * will be used and it will be up to the caller to commit the transaction at its
159 * @param object - The vertex to be created or updated.
160 * @param transaction - Optional transaction context to perform the operation in.
162 * @return - The vertex, as created, marshaled as a {@link ChampObject}
164 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
165 * into the backend representation
166 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
167 * by {@link ChampGraph#retrieveSchema}
168 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
169 * is not present or object not found in the graph
170 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
172 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
175 * Updates an existing vertex in the graph store.
177 * If a transaction context is not provided, then a transaction will be automatically
178 * created and committed for this operation only, otherwise, the supplied transaction
179 * will be used and it will be up to the caller to commit the transaction at its
182 * @param object - The vertex to be created or updated.
183 * @param transaction - Optional transaction context to perform the operation in.
185 * @return - The updated vertex, marshaled as a {@link ChampObject}
187 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
188 * the backend representation
189 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
190 * by {@link ChampGraph#retrieveSchema}
191 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
192 * is not present or object not found in the graph
193 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
195 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
198 * Deletes an existing vertex from the graph store.
200 * If a transaction context is not provided, then a transaction will be automatically
201 * created and committed for this operation only, otherwise, the supplied transaction
202 * will be used and it will be up to the caller to commit the transaction at its
205 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
206 * @param transaction - Optional transaction context to perform the operation in.
208 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
209 * is not present or object not found in the graph
210 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
212 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
215 * Creates or updates an edge in the graph data store.
217 * If a transaction context is not provided, then a transaction will be automatically
218 * created and committed for this operation only, otherwise, the supplied transaction
219 * will be used and it will be up to the caller to commit the transaction at its
222 * @param relationship - The ChampRelationship that you wish to store in the graph
223 * @param transaction - Optional transaction context to perform the operation in.
225 * @return - The {@link ChampRelationship} as it was stored.
227 * @throws ChampUnmarshallingException - If the edge which was created could not be
228 * unmarshalled into a ChampRelationship
229 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
230 * marshalled into the backend representation
231 * @throws ChampObjectNotExistsException - If either the source or target object referenced
232 * by this relationship does not exist in the graph
233 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
234 * specifed by {@link ChampGraph#retrieveSchema}
235 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
236 * but the object cannot be found in the graph
237 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
239 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
242 * Replaces an existing edge in the graph data store.
244 * If a transaction context is not provided, then a transaction will be automatically
245 * created and committed for this operation only, otherwise, the supplied transaction
246 * will be used and it will be up to the caller to commit the transaction at its
249 * @param relationship - The ChampRelationship that you wish to replace in the graph
250 * @param transaction - Optional transaction context to perform the operation in.
252 * @return - The {@link ChampRelationship} as it was stored.
254 * @throws ChampUnmarshallingException - If the edge which was created could not be
255 * unmarshalled into a ChampRelationship
256 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
257 * marshalled into the backend representation
258 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
259 * specifed by {@link ChampGraph#retrieveSchema}
260 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
261 * but the object cannot be found in the graph
262 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
264 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
267 * Removes an edge from the graph data store.
269 * If a transaction context is not provided, then a transaction will be automatically
270 * created and committed for this operation only, otherwise, the supplied transaction
271 * will be used and it will be up to the caller to commit the transaction at its
274 * @param relationship - The ChampRelationship that you wish to remove from the graph.
275 * @param transaction - Optional transaction context to perform the operation in.
277 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
278 * but the object cannot be found in the graph
279 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
281 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
284 * Create or update a {@link ChampPartition}.
286 * If a transaction context is not provided, then a transaction will be automatically
287 * created and committed for this operation only, otherwise, the supplied transaction
288 * will be used and it will be up to the caller to commit the transaction at its
291 * @param partition - The ChampPartition that you wish to create or update in the graph.
292 * @param transaction - Optional transaction context to perform the operation in.
294 * @return - The {@link ChampPartition} as it was stored.
296 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
297 * specifed by {@link ChampGraph#retrieveSchema}
298 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
299 * but the object cannot be found in the graph
300 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
301 * marshalled into the backend representation
302 * @throws ChampObjectNotExistsException - If either the source or target object referenced
303 * by this relationship does not exist in the graph
304 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
306 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
309 * Removes a partition from the graph.
311 * If a transaction context is not provided, then a transaction will be automatically
312 * created and committed for this operation only, otherwise, the supplied transaction
313 * will be used and it will be up to the caller to commit the transaction at its
316 * @param graph - The partition to be removed.
317 * @param transaction - Optional transaction context to perform the operation in.
319 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
321 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
324 * Create or update an object index in the graph.
326 * @param index - The object index to be created/updated.
328 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
329 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
330 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
331 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
332 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
333 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
334 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
335 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
336 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
337 public abstract ChampSchema retrieveSchema();
338 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
339 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
340 public abstract void deleteSchema();
341 public abstract ChampCapabilities capabilities();
345 * Thread factory for the event producer workers.
347 private class ProducerWorkerThreadFactory implements ThreadFactory {
349 private AtomicInteger threadNumber = new AtomicInteger(1);
351 public Thread newThread(Runnable r) {
352 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
358 * Process the configuration properties supplied for this graph instance.
360 * @param properties - Configuration parameters.
362 private void configure(Map<String, Object> properties) {
364 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
367 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
368 eventStreamPublisherPoolSize =
369 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
373 public void setProducer(EventPublisher aProducer) {
375 producer = aProducer;
378 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
380 if(properties.containsKey(property)) {
381 return properties.get(property);
388 public void shutdown() {
390 if(publisherPool != null) {
391 publisherPool.shutdown();
394 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
395 } catch (InterruptedException e) {}
398 if(producer != null) {
403 } catch (Exception e) {
404 logger.error("Failed to stop event stream producer: " + e.getMessage());
410 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
414 // Commit the transaction.
415 transaction.commit();
417 } catch (ChampTransactionException e) {
419 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
421 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
422 for(ChampEvent event : enqueuedEvents) {
424 logger.debug("Graph event " + event.toString() + " not published.");
429 // Now that the transaction has been successfully committed, we need
430 // to log the events that were produced within that transaction's
432 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
433 for(ChampEvent event : enqueuedEvents) {
439 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
441 // Rollback the transaction.
442 transaction.rollback();
446 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
447 return storeObject(object, Optional.empty());
451 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
453 ChampObject storedObject = executeStoreObject(object, transaction);
455 if(storedObject != null) {
457 logOrEnqueueEvent(ChampEvent.builder()
458 .operation(ChampOperation.STORE)
459 .entity(storedObject)
468 public ChampObject replaceObject(ChampObject object)
469 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
471 return replaceObject(object, Optional.empty());
475 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
476 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
478 ChampObject replacedObject = executeReplaceObject(object, transaction);
480 if(replacedObject != null) {
482 logOrEnqueueEvent(ChampEvent.builder()
483 .operation(ChampOperation.REPLACE)
484 .entity(replacedObject)
489 return replacedObject;
493 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
494 deleteObject(key, Optional.empty());
498 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
500 // Retrieve the object that we are deleting before it's gone, so that we can
501 // report it to the event stream.
502 Optional<ChampObject> objectToDelete = Optional.empty();
504 objectToDelete = retrieveObject(key, transaction);
506 } catch (ChampUnmarshallingException e) {
507 logger.error("Unable to generate delete object log: " + e.getMessage());
510 executeDeleteObject(key, transaction);
512 if(objectToDelete.isPresent()) {
513 // Update the event stream with the current operation.
514 logOrEnqueueEvent(ChampEvent.builder()
515 .operation(ChampOperation.DELETE)
516 .entity(objectToDelete.get())
523 public ChampRelationship storeRelationship(ChampRelationship relationship)
524 throws ChampUnmarshallingException,
525 ChampMarshallingException,
526 ChampObjectNotExistsException,
527 ChampSchemaViolationException,
528 ChampRelationshipNotExistsException, ChampTransactionException {
529 return storeRelationship(relationship, Optional.empty());
533 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
534 throws ChampUnmarshallingException,
535 ChampMarshallingException,
536 ChampObjectNotExistsException,
537 ChampSchemaViolationException,
538 ChampRelationshipNotExistsException, ChampTransactionException {
540 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
542 if(storedRelationship != null) {
544 // Update the event stream with the current operation.
545 logOrEnqueueEvent(ChampEvent.builder()
546 .operation(ChampOperation.STORE)
547 .entity(storedRelationship)
552 return storedRelationship;
556 public ChampRelationship replaceRelationship(ChampRelationship relationship)
557 throws ChampUnmarshallingException,
558 ChampMarshallingException,
559 ChampSchemaViolationException,
560 ChampRelationshipNotExistsException, ChampTransactionException {
561 return replaceRelationship(relationship, Optional.empty());
565 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
566 throws ChampUnmarshallingException,
567 ChampMarshallingException,
568 ChampSchemaViolationException,
569 ChampRelationshipNotExistsException, ChampTransactionException {
571 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
573 if(replacedRelationship != null) {
575 // Update the event stream with the current operation.
576 logOrEnqueueEvent(ChampEvent.builder()
577 .operation(ChampOperation.REPLACE)
578 .entity(replacedRelationship)
583 return replacedRelationship;
587 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
588 deleteRelationship(relationship, Optional.empty());
592 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
594 executeDeleteRelationship(relationship, transaction);
596 // Update the event stream with the current operation.
597 logOrEnqueueEvent(ChampEvent.builder()
598 .operation(ChampOperation.DELETE)
599 .entity(relationship)
605 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
606 return storePartition(partition, Optional.empty());
610 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
612 ChampPartition storedPartition = executeStorePartition(partition, transaction);
614 if(storedPartition != null) {
616 // Update the event stream with the current operation.
617 logOrEnqueueEvent(ChampEvent.builder()
618 .operation(ChampOperation.STORE)
619 .entity(storedPartition)
624 return storedPartition;
628 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
629 deletePartition(graph, Optional.empty());
633 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
635 executeDeletePartition(graph, transaction);
637 // Update the event stream with the current operation.
638 logOrEnqueueEvent(ChampEvent.builder()
639 .operation(ChampOperation.DELETE)
646 public void storeObjectIndex(ChampObjectIndex index) {
648 executeStoreObjectIndex(index);
650 // Update the event stream with the current operation.
651 logEvent(ChampEvent.builder()
652 .operation(ChampOperation.STORE)
658 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
660 // Retrieve the index that we are deleting before it's gone, so that we can
661 // report it to the event stream.
662 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
664 executeDeleteObjectIndex(indexName);
666 if(indexToDelete.isPresent()) {
667 // Update the event stream with the current operation.
668 logEvent(ChampEvent.builder()
669 .operation(ChampOperation.DELETE)
670 .entity(indexToDelete.get())
676 public void storeRelationshipIndex(ChampRelationshipIndex index) {
678 executeStoreRelationshipIndex(index);
680 // Update the event stream with the current operation.
681 logEvent(ChampEvent.builder()
682 .operation(ChampOperation.STORE)
688 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
690 // Retrieve the index that we are deleting before it's gone, so that we can
691 // report it to the event stream.
692 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
694 executeDeleteRelationshipIndex(indexName);
696 if(indexToDelete.isPresent()) {
697 // Update the event stream with the current operation.
698 logEvent(ChampEvent.builder()
699 .operation(ChampOperation.DELETE)
700 .entity(indexToDelete.get())
705 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
707 if(!transaction.isPresent()) {
708 // Update the event stream with the current operation.
712 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
713 event.setDbTransactionId ( transaction.get ().id () );
714 transaction.get().logEvent(event);
719 * Submits an event to be published to the event stream.
721 * @param anEvent - The event to be published.
723 public void logEvent(ChampEvent anEvent) {
725 if(eventQueue == null) {
729 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
730 if(logger.isDebugEnabled()) {
731 logger.debug("Event payload: " + anEvent.toString());
734 // Try to submit the event to be published to the event bus.
735 if(!eventQueue.offer(anEvent)) {
736 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
742 * This class implements the worker threads for our thread pool which are responsible for
743 * pulling the next outgoing event from the internal buffer and forwarding them to the event
746 * Each publish operation is performed synchronously, so that the thread will only move on
747 * to the next available event once it has actually published the current event to the bus.
749 private class EventPublisherWorker implements Runnable {
751 /** Partition key to use when publishing events to the event stream. We WANT all events
752 * to go to a single partition, so we are just using a hard-coded key for every event. */
753 private static final String EVENTS_PARTITION_KEY = "champEventKey";
760 ChampEvent event = null;
763 // Get the next event to be published from the queue.
764 event = eventQueue.take();
766 } catch (InterruptedException e) {
768 // Restore the interrupted status.
769 Thread.currentThread().interrupt();
772 // Create new envelope containing an event header and ChampEvent
773 ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
775 // Try publishing the event to the event bus. This call will block until
777 producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
779 } catch (Exception e) {
781 logger.error("Failed to publish event to event bus: " + e.getMessage());