2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.onap.aai.champcore.event;
25 import java.util.List;
27 import java.util.Optional;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.stream.Stream;
37 import org.onap.aai.champcore.ChampCapabilities;
38 import org.onap.aai.champcore.ChampGraph;
39 import org.onap.aai.champcore.ChampTransaction;
40 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
41 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
43 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
46 import org.onap.aai.champcore.exceptions.ChampTransactionException;
47 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPartition;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 import org.onap.aai.event.api.EventPublisher;
64 * This class provides the hooks to allow Champ operations to be logged to an event
67 public abstract class AbstractLoggingChampGraph implements ChampGraph {
69 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
71 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
72 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
73 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
74 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
76 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
78 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
79 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
81 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
82 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
86 * Creates or updates a vertex in the graph data store.
88 * If a transaction context is not provided, then a transaction will be automatically
89 * created and committed for this operation only, otherwise, the supplied transaction
90 * will be used and it will be up to the caller to commit the transaction at its
93 * @param object - The vertex to be created or updated.
94 * @param transaction - Optional transaction context to perform the operation in.
96 * @return - The vertex, as created, marshaled as a {@link ChampObject}
98 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
99 * into the backend representation
100 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
101 * by {@link ChampGraph#retrieveSchema}
102 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
103 * is not present or object not found in the graph
104 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
106 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
109 * Updates an existing vertex in the graph store.
111 * If a transaction context is not provided, then a transaction will be automatically
112 * created and committed for this operation only, otherwise, the supplied transaction
113 * will be used and it will be up to the caller to commit the transaction at its
116 * @param object - The vertex to be created or updated.
117 * @param transaction - Optional transaction context to perform the operation in.
119 * @return - The updated vertex, marshaled as a {@link ChampObject}
121 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
122 * the backend representation
123 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
124 * by {@link ChampGraph#retrieveSchema}
125 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
126 * is not present or object not found in the graph
127 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
129 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
132 * Deletes an existing vertex from the graph store.
134 * If a transaction context is not provided, then a transaction will be automatically
135 * created and committed for this operation only, otherwise, the supplied transaction
136 * will be used and it will be up to the caller to commit the transaction at its
139 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
140 * @param transaction - Optional transaction context to perform the operation in.
142 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
143 * is not present or object not found in the graph
144 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
146 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
149 * Creates or updates an edge in the graph data store.
151 * If a transaction context is not provided, then a transaction will be automatically
152 * created and committed for this operation only, otherwise, the supplied transaction
153 * will be used and it will be up to the caller to commit the transaction at its
156 * @param relationship - The ChampRelationship that you wish to store in the graph
157 * @param transaction - Optional transaction context to perform the operation in.
159 * @return - The {@link ChampRelationship} as it was stored.
161 * @throws ChampUnmarshallingException - If the edge which was created could not be
162 * unmarshalled into a ChampRelationship
163 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
164 * marshalled into the backend representation
165 * @throws ChampObjectNotExistsException - If either the source or target object referenced
166 * by this relationship does not exist in the graph
167 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
168 * specifed by {@link ChampGraph#retrieveSchema}
169 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
170 * but the object cannot be found in the graph
171 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
173 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
176 * Replaces an existing edge in the graph data store.
178 * If a transaction context is not provided, then a transaction will be automatically
179 * created and committed for this operation only, otherwise, the supplied transaction
180 * will be used and it will be up to the caller to commit the transaction at its
183 * @param relationship - The ChampRelationship that you wish to replace in the graph
184 * @param transaction - Optional transaction context to perform the operation in.
186 * @return - The {@link ChampRelationship} as it was stored.
188 * @throws ChampUnmarshallingException - If the edge which was created could not be
189 * unmarshalled into a ChampRelationship
190 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
191 * marshalled into the backend representation
192 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
193 * specifed by {@link ChampGraph#retrieveSchema}
194 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
195 * but the object cannot be found in the graph
196 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
198 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
201 * Removes an edge from the graph data store.
203 * If a transaction context is not provided, then a transaction will be automatically
204 * created and committed for this operation only, otherwise, the supplied transaction
205 * will be used and it will be up to the caller to commit the transaction at its
208 * @param relationship - The ChampRelationship that you wish to remove from the graph.
209 * @param transaction - Optional transaction context to perform the operation in.
211 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
212 * but the object cannot be found in the graph
213 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
215 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
218 * Create or update a {@link ChampPartition}.
220 * If a transaction context is not provided, then a transaction will be automatically
221 * created and committed for this operation only, otherwise, the supplied transaction
222 * will be used and it will be up to the caller to commit the transaction at its
225 * @param partition - The ChampPartition that you wish to create or update in the graph.
226 * @param transaction - Optional transaction context to perform the operation in.
228 * @return - The {@link ChampPartition} as it was stored.
230 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
231 * specifed by {@link ChampGraph#retrieveSchema}
232 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
233 * but the object cannot be found in the graph
234 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
235 * marshalled into the backend representation
236 * @throws ChampObjectNotExistsException - If either the source or target object referenced
237 * by this relationship does not exist in the graph
238 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
240 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
243 * Removes a partition from the graph.
245 * If a transaction context is not provided, then a transaction will be automatically
246 * created and committed for this operation only, otherwise, the supplied transaction
247 * will be used and it will be up to the caller to commit the transaction at its
250 * @param graph - The partition to be removed.
251 * @param transaction - Optional transaction context to perform the operation in.
253 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
255 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
258 * Create or update an object index in the graph.
260 * @param index - The object index to be created/updated.
262 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
264 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
265 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
266 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
267 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
268 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
269 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
270 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
271 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
272 public abstract ChampSchema retrieveSchema();
273 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
274 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
275 public abstract void deleteSchema();
276 public abstract ChampCapabilities capabilities();
280 public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
281 public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
283 public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
284 public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
286 public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
290 /** Number of events that can be queued up for publication before we begin dropping
292 private Integer eventQueueCapacity;
294 /** Number of event publisher worker threads. */
295 private Integer eventStreamPublisherPoolSize;
297 /** Pool of worker threads that do the work of publishing the events to the event bus. */
298 protected ThreadPoolExecutor publisherPool;
300 /** Client used for publishing events to the event bus. */
301 protected EventPublisher producer;
303 /** Internal queue where outgoing events will be buffered until they can be serviced by
304 * the event publisher worker threads. */
305 protected BlockingQueue<ChampEvent> eventQueue;
309 * Thread factory for the event producer workers.
311 private class ProducerWorkerThreadFactory implements ThreadFactory {
313 private AtomicInteger threadNumber = new AtomicInteger(1);
315 public Thread newThread(Runnable r) {
316 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
322 * Create a new instance of the AbstractLoggingChampGraph.
324 * @param properties - Set of configuration properties for this graph instance.
326 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
328 // Extract the necessary parameters from the configuration properties.
329 configure(properties);
331 // Make sure we were passed an event producer as one of our properties, otherwise
332 // there is really nothing more we can do...
333 if(producer == null) {
334 logger.error("No event stream producer was supplied.");
335 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
339 // Create the blocking queue that we will use to buffer events that we want
340 // published to the event bus.
341 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
343 // Create the executor pool that will do the work of publishing events to the event bus.
345 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
346 new ProducerWorkerThreadFactory());
350 // Start up the producer worker threads.
351 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
352 publisherPool.submit(new EventPublisherWorker());
355 } catch (Exception e) {
357 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
358 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
365 * Process the configuration properties supplied for this graph instance.
367 * @param properties - Configuration parameters.
369 private void configure(Map<String, Object> properties) {
371 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
374 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
375 eventStreamPublisherPoolSize =
376 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
380 public void setProducer(EventPublisher aProducer) {
382 producer = aProducer;
385 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
387 if(properties.containsKey(property)) {
388 return properties.get(property);
395 public void shutdown() {
397 if(publisherPool != null) {
398 publisherPool.shutdown();
401 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
402 } catch (InterruptedException e) {}
405 if(producer != null) {
410 } catch (Exception e) {
411 logger.error("Failed to stop event stream producer: " + e.getMessage());
417 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
421 // Commit the transaction.
422 transaction.commit();
424 } catch (ChampTransactionException e) {
426 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
428 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
429 for(ChampEvent event : enqueuedEvents) {
431 logger.debug("Graph event " + event.toString() + " not published.");
436 // Now that the transaction has been successfully committed, we need
437 // to log the events that were produced within that transaction's
439 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
440 for(ChampEvent event : enqueuedEvents) {
446 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
448 // Rollback the transaction.
449 transaction.rollback();
453 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
454 return storeObject(object, Optional.empty());
458 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
460 ChampObject storedObject = executeStoreObject(object, transaction);
462 if(storedObject != null) {
464 logOrEnqueueEvent(ChampEvent.builder()
465 .operation(ChampOperation.STORE)
466 .entity(storedObject)
475 public ChampObject replaceObject(ChampObject object)
476 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
478 return replaceObject(object, Optional.empty());
482 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
483 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
485 ChampObject replacedObject = executeReplaceObject(object, transaction);
487 if(replacedObject != null) {
489 logOrEnqueueEvent(ChampEvent.builder()
490 .operation(ChampOperation.REPLACE)
491 .entity(replacedObject)
496 return replacedObject;
500 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
501 deleteObject(key, Optional.empty());
505 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
507 // Retrieve the object that we are deleting before it's gone, so that we can
508 // report it to the event stream.
509 Optional<ChampObject> objectToDelete = Optional.empty();
511 objectToDelete = retrieveObject(key, transaction);
513 } catch (ChampUnmarshallingException e) {
514 logger.error("Unable to generate delete object log: " + e.getMessage());
517 executeDeleteObject(key, transaction);
519 if(objectToDelete.isPresent()) {
520 // Update the event stream with the current operation.
521 logOrEnqueueEvent(ChampEvent.builder()
522 .operation(ChampOperation.DELETE)
523 .entity(objectToDelete.get())
530 public ChampRelationship storeRelationship(ChampRelationship relationship)
531 throws ChampUnmarshallingException,
532 ChampMarshallingException,
533 ChampObjectNotExistsException,
534 ChampSchemaViolationException,
535 ChampRelationshipNotExistsException, ChampTransactionException {
536 return storeRelationship(relationship, Optional.empty());
540 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
541 throws ChampUnmarshallingException,
542 ChampMarshallingException,
543 ChampObjectNotExistsException,
544 ChampSchemaViolationException,
545 ChampRelationshipNotExistsException, ChampTransactionException {
547 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
549 if(storedRelationship != null) {
551 // Update the event stream with the current operation.
552 logOrEnqueueEvent(ChampEvent.builder()
553 .operation(ChampOperation.STORE)
554 .entity(storedRelationship)
559 return storedRelationship;
563 public ChampRelationship replaceRelationship(ChampRelationship relationship)
564 throws ChampUnmarshallingException,
565 ChampMarshallingException,
566 ChampSchemaViolationException,
567 ChampRelationshipNotExistsException, ChampTransactionException {
568 return replaceRelationship(relationship, Optional.empty());
572 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
573 throws ChampUnmarshallingException,
574 ChampMarshallingException,
575 ChampSchemaViolationException,
576 ChampRelationshipNotExistsException, ChampTransactionException {
578 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
580 if(replacedRelationship != null) {
582 // Update the event stream with the current operation.
583 logOrEnqueueEvent(ChampEvent.builder()
584 .operation(ChampOperation.REPLACE)
585 .entity(replacedRelationship)
590 return replacedRelationship;
594 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
595 deleteRelationship(relationship, Optional.empty());
599 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
601 executeDeleteRelationship(relationship, transaction);
603 // Update the event stream with the current operation.
604 logOrEnqueueEvent(ChampEvent.builder()
605 .operation(ChampOperation.DELETE)
606 .entity(relationship)
612 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
613 return storePartition(partition, Optional.empty());
617 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
619 ChampPartition storedPartition = executeStorePartition(partition, transaction);
621 if(storedPartition != null) {
623 // Update the event stream with the current operation.
624 logOrEnqueueEvent(ChampEvent.builder()
625 .operation(ChampOperation.STORE)
626 .entity(storedPartition)
631 return storedPartition;
635 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
636 deletePartition(graph, Optional.empty());
640 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
642 executeDeletePartition(graph, transaction);
644 // Update the event stream with the current operation.
645 logOrEnqueueEvent(ChampEvent.builder()
646 .operation(ChampOperation.DELETE)
653 public void storeObjectIndex(ChampObjectIndex index) {
655 executeStoreObjectIndex(index);
657 // Update the event stream with the current operation.
658 logEvent(ChampEvent.builder()
659 .operation(ChampOperation.STORE)
665 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
667 // Retrieve the index that we are deleting before it's gone, so that we can
668 // report it to the event stream.
669 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
671 executeDeleteObjectIndex(indexName);
673 if(indexToDelete.isPresent()) {
674 // Update the event stream with the current operation.
675 logEvent(ChampEvent.builder()
676 .operation(ChampOperation.DELETE)
677 .entity(indexToDelete.get())
683 public void storeRelationshipIndex(ChampRelationshipIndex index) {
685 executeStoreRelationshipIndex(index);
687 // Update the event stream with the current operation.
688 logEvent(ChampEvent.builder()
689 .operation(ChampOperation.STORE)
695 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
697 // Retrieve the index that we are deleting before it's gone, so that we can
698 // report it to the event stream.
699 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
701 executeDeleteRelationshipIndex(indexName);
703 if(indexToDelete.isPresent()) {
704 // Update the event stream with the current operation.
705 logEvent(ChampEvent.builder()
706 .operation(ChampOperation.DELETE)
707 .entity(indexToDelete.get())
712 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
714 if(!transaction.isPresent()) {
715 // Update the event stream with the current operation.
719 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
720 event.setDbTransactionId ( transaction.get ().id () );
721 transaction.get().logEvent(event);
726 * Submits an event to be published to the event stream.
728 * @param anEvent - The event to be published.
730 public void logEvent(ChampEvent anEvent) {
732 if(eventQueue == null) {
736 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
737 if(logger.isDebugEnabled()) {
738 logger.debug("Event payload: " + anEvent.toString());
741 // Try to submit the event to be published to the event bus.
742 if(!eventQueue.offer(anEvent)) {
743 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
749 * This class implements the worker threads for our thread pool which are responsible for
750 * pulling the next outgoing event from the internal buffer and forwarding them to the event
753 * Each publish operation is performed synchronously, so that the thread will only move on
754 * to the next available event once it has actually published the current event to the bus.
756 private class EventPublisherWorker implements Runnable {
758 /** Partition key to use when publishing events to the event stream. We WANT all events
759 * to go to a single partition, so we are just using a hard-coded key for every event. */
760 private static final String EVENTS_PARTITION_KEY = "champEventKey";
768 ChampEvent event = null;
771 // Get the next event to be published from the queue.
772 event = eventQueue.take();
774 } catch (InterruptedException e) {
776 // Restore the interrupted status.
777 Thread.currentThread().interrupt();
780 // Try publishing the event to the event bus. This call will block
783 producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
785 } catch (Exception e) {
787 logger.error("Failed to publish event to event bus: " + e.getMessage());