2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
62 * This class provides the hooks to allow Champ operations to be logged to an event
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
67 public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68 public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69 public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70 public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71 public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
73 /** Pool of worker threads that do the work of publishing the events to the event bus. */
74 protected ThreadPoolExecutor publisherPool;
76 /** Client used for publishing events to the event bus. */
77 protected EventPublisher producer;
79 /** Internal queue where outgoing events will be buffered until they can be serviced by
80 * the event publisher worker threads. */
81 protected BlockingQueue<ChampEvent> eventQueue;
83 /** Number of events that can be queued up for publication before we begin dropping
85 private Integer eventQueueCapacity;
87 /** Number of event publisher worker threads. */
88 private Integer eventStreamPublisherPoolSize;
90 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
94 * Create a new instance of the AbstractLoggingChampGraph.
96 * @param properties - Set of configuration properties for this graph instance.
98 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
100 // Extract the necessary parameters from the configuration properties.
101 configure(properties);
103 // Make sure we were passed an event producer as one of our properties, otherwise
104 // there is really nothing more we can do...
105 if(producer == null) {
106 logger.error("No event stream producer was supplied.");
107 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
111 // Create the blocking queue that we will use to buffer events that we want
112 // published to the event bus.
113 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
115 // Create the executor pool that will do the work of publishing events to the event bus.
117 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
118 new ProducerWorkerThreadFactory());
122 // Start up the producer worker threads.
123 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
124 publisherPool.submit(new EventPublisherWorker());
127 } catch (Exception e) {
129 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
130 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
137 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
138 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
139 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
140 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
142 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
144 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
145 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
146 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
147 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
149 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
152 * Creates or updates a vertex in the graph data store.
154 * If a transaction context is not provided, then a transaction will be automatically
155 * created and committed for this operation only, otherwise, the supplied transaction
156 * will be used and it will be up to the caller to commit the transaction at its
159 * @param object - The vertex to be created or updated.
160 * @param transaction - Optional transaction context to perform the operation in.
162 * @return - The vertex, as created, marshaled as a {@link ChampObject}
164 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
165 * into the backend representation
166 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
167 * by {@link ChampGraph#retrieveSchema}
168 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
169 * is not present or object not found in the graph
170 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
172 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
175 * Updates an existing vertex in the graph store.
177 * If a transaction context is not provided, then a transaction will be automatically
178 * created and committed for this operation only, otherwise, the supplied transaction
179 * will be used and it will be up to the caller to commit the transaction at its
182 * @param object - The vertex to be created or updated.
183 * @param transaction - Optional transaction context to perform the operation in.
185 * @return - The updated vertex, marshaled as a {@link ChampObject}
187 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
188 * the backend representation
189 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
190 * by {@link ChampGraph#retrieveSchema}
191 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
192 * is not present or object not found in the graph
193 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
195 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
198 * Deletes an existing vertex from the graph store.
200 * If a transaction context is not provided, then a transaction will be automatically
201 * created and committed for this operation only, otherwise, the supplied transaction
202 * will be used and it will be up to the caller to commit the transaction at its
205 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
206 * @param transaction - Optional transaction context to perform the operation in.
208 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
209 * is not present or object not found in the graph
210 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
212 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
215 * Creates or updates an edge in the graph data store.
217 * If a transaction context is not provided, then a transaction will be automatically
218 * created and committed for this operation only, otherwise, the supplied transaction
219 * will be used and it will be up to the caller to commit the transaction at its
222 * @param relationship - The ChampRelationship that you wish to store in the graph
223 * @param transaction - Optional transaction context to perform the operation in.
225 * @return - The {@link ChampRelationship} as it was stored.
227 * @throws ChampUnmarshallingException - If the edge which was created could not be
228 * unmarshalled into a ChampRelationship
229 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
230 * marshalled into the backend representation
231 * @throws ChampObjectNotExistsException - If either the source or target object referenced
232 * by this relationship does not exist in the graph
233 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
234 * specifed by {@link ChampGraph#retrieveSchema}
235 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
236 * but the object cannot be found in the graph
237 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
239 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
242 * Replaces an existing edge in the graph data store.
244 * If a transaction context is not provided, then a transaction will be automatically
245 * created and committed for this operation only, otherwise, the supplied transaction
246 * will be used and it will be up to the caller to commit the transaction at its
249 * @param relationship - The ChampRelationship that you wish to replace in the graph
250 * @param transaction - Optional transaction context to perform the operation in.
252 * @return - The {@link ChampRelationship} as it was stored.
254 * @throws ChampUnmarshallingException - If the edge which was created could not be
255 * unmarshalled into a ChampRelationship
256 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
257 * marshalled into the backend representation
258 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
259 * specifed by {@link ChampGraph#retrieveSchema}
260 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
261 * but the object cannot be found in the graph
262 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
264 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
267 * Removes an edge from the graph data store.
269 * If a transaction context is not provided, then a transaction will be automatically
270 * created and committed for this operation only, otherwise, the supplied transaction
271 * will be used and it will be up to the caller to commit the transaction at its
274 * @param relationship - The ChampRelationship that you wish to remove from the graph.
275 * @param transaction - Optional transaction context to perform the operation in.
277 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
278 * but the object cannot be found in the graph
279 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
281 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
284 * Create or update a {@link ChampPartition}.
286 * If a transaction context is not provided, then a transaction will be automatically
287 * created and committed for this operation only, otherwise, the supplied transaction
288 * will be used and it will be up to the caller to commit the transaction at its
291 * @param partition - The ChampPartition that you wish to create or update in the graph.
292 * @param transaction - Optional transaction context to perform the operation in.
294 * @return - The {@link ChampPartition} as it was stored.
296 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
297 * specifed by {@link ChampGraph#retrieveSchema}
298 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
299 * but the object cannot be found in the graph
300 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
301 * marshalled into the backend representation
302 * @throws ChampObjectNotExistsException - If either the source or target object referenced
303 * by this relationship does not exist in the graph
304 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
306 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
309 * Removes a partition from the graph.
311 * If a transaction context is not provided, then a transaction will be automatically
312 * created and committed for this operation only, otherwise, the supplied transaction
313 * will be used and it will be up to the caller to commit the transaction at its
316 * @param graph - The partition to be removed.
317 * @param transaction - Optional transaction context to perform the operation in.
319 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
321 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
324 * Create or update an object index in the graph.
326 * @param index - The object index to be created/updated.
328 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
329 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
330 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
331 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
332 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
333 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
334 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
335 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
336 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
337 public abstract ChampSchema retrieveSchema();
338 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
339 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
340 public abstract void deleteSchema();
341 public abstract ChampCapabilities capabilities();
345 * Thread factory for the event producer workers.
347 private class ProducerWorkerThreadFactory implements ThreadFactory {
349 private AtomicInteger threadNumber = new AtomicInteger(1);
351 public Thread newThread(Runnable r) {
352 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
358 * Process the configuration properties supplied for this graph instance.
360 * @param properties - Configuration parameters.
362 private void configure(Map<String, Object> properties) {
364 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
367 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
368 eventStreamPublisherPoolSize =
369 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
373 public void setProducer(EventPublisher aProducer) {
375 producer = aProducer;
378 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
380 if(properties.containsKey(property)) {
381 return properties.get(property);
388 public void shutdown() {
390 if(publisherPool != null) {
391 publisherPool.shutdown();
394 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
395 } catch (InterruptedException e) {
396 logger.warn("Termination interrupted");
397 Thread.currentThread().interrupt();
401 if(producer != null) {
406 } catch (Exception e) {
407 logger.error("Failed to stop event stream producer: " + e.getMessage());
413 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
417 // Commit the transaction.
418 transaction.commit();
420 } catch (ChampTransactionException e) {
422 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
424 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
425 for(ChampEvent event : enqueuedEvents) {
427 logger.debug("Graph event " + event.toString() + " not published.");
432 // Now that the transaction has been successfully committed, we need
433 // to log the events that were produced within that transaction's
435 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
436 for(ChampEvent event : enqueuedEvents) {
442 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
444 // Rollback the transaction.
445 transaction.rollback();
449 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
450 return storeObject(object, Optional.empty());
454 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
456 ChampObject storedObject = executeStoreObject(object, transaction);
458 if(storedObject != null) {
460 logOrEnqueueEvent(ChampEvent.builder()
461 .operation(ChampOperation.STORE)
462 .entity(storedObject)
471 public ChampObject replaceObject(ChampObject object)
472 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
474 return replaceObject(object, Optional.empty());
478 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
479 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
481 ChampObject replacedObject = executeReplaceObject(object, transaction);
483 if(replacedObject != null) {
485 logOrEnqueueEvent(ChampEvent.builder()
486 .operation(ChampOperation.REPLACE)
487 .entity(replacedObject)
492 return replacedObject;
496 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
497 deleteObject(key, Optional.empty());
501 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
503 // Retrieve the object that we are deleting before it's gone, so that we can
504 // report it to the event stream.
505 Optional<ChampObject> objectToDelete = Optional.empty();
507 objectToDelete = retrieveObject(key, transaction);
509 } catch (ChampUnmarshallingException e) {
510 logger.error("Unable to generate delete object log: " + e.getMessage());
513 executeDeleteObject(key, transaction);
515 if(objectToDelete.isPresent()) {
516 // Update the event stream with the current operation.
517 logOrEnqueueEvent(ChampEvent.builder()
518 .operation(ChampOperation.DELETE)
519 .entity(objectToDelete.get())
526 public ChampRelationship storeRelationship(ChampRelationship relationship)
527 throws ChampUnmarshallingException,
528 ChampMarshallingException,
529 ChampObjectNotExistsException,
530 ChampSchemaViolationException,
531 ChampRelationshipNotExistsException, ChampTransactionException {
532 return storeRelationship(relationship, Optional.empty());
536 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
537 throws ChampUnmarshallingException,
538 ChampMarshallingException,
539 ChampObjectNotExistsException,
540 ChampSchemaViolationException,
541 ChampRelationshipNotExistsException, ChampTransactionException {
543 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
545 if(storedRelationship != null) {
547 // Update the event stream with the current operation.
548 logOrEnqueueEvent(ChampEvent.builder()
549 .operation(ChampOperation.STORE)
550 .entity(storedRelationship)
555 return storedRelationship;
559 public ChampRelationship replaceRelationship(ChampRelationship relationship)
560 throws ChampUnmarshallingException,
561 ChampMarshallingException,
562 ChampSchemaViolationException,
563 ChampRelationshipNotExistsException, ChampTransactionException {
564 return replaceRelationship(relationship, Optional.empty());
568 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
569 throws ChampUnmarshallingException,
570 ChampMarshallingException,
571 ChampSchemaViolationException,
572 ChampRelationshipNotExistsException, ChampTransactionException {
574 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
576 if(replacedRelationship != null) {
578 // Update the event stream with the current operation.
579 logOrEnqueueEvent(ChampEvent.builder()
580 .operation(ChampOperation.REPLACE)
581 .entity(replacedRelationship)
586 return replacedRelationship;
590 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
591 deleteRelationship(relationship, Optional.empty());
595 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
597 executeDeleteRelationship(relationship, transaction);
599 // Update the event stream with the current operation.
600 logOrEnqueueEvent(ChampEvent.builder()
601 .operation(ChampOperation.DELETE)
602 .entity(relationship)
608 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
609 return storePartition(partition, Optional.empty());
613 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
615 ChampPartition storedPartition = executeStorePartition(partition, transaction);
617 if(storedPartition != null) {
619 // Update the event stream with the current operation.
620 logOrEnqueueEvent(ChampEvent.builder()
621 .operation(ChampOperation.STORE)
622 .entity(storedPartition)
627 return storedPartition;
631 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
632 deletePartition(graph, Optional.empty());
636 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
638 executeDeletePartition(graph, transaction);
640 // Update the event stream with the current operation.
641 logOrEnqueueEvent(ChampEvent.builder()
642 .operation(ChampOperation.DELETE)
649 public void storeObjectIndex(ChampObjectIndex index) {
651 executeStoreObjectIndex(index);
653 // Update the event stream with the current operation.
654 logEvent(ChampEvent.builder()
655 .operation(ChampOperation.STORE)
661 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
663 // Retrieve the index that we are deleting before it's gone, so that we can
664 // report it to the event stream.
665 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
667 executeDeleteObjectIndex(indexName);
669 if(indexToDelete.isPresent()) {
670 // Update the event stream with the current operation.
671 logEvent(ChampEvent.builder()
672 .operation(ChampOperation.DELETE)
673 .entity(indexToDelete.get())
679 public void storeRelationshipIndex(ChampRelationshipIndex index) {
681 executeStoreRelationshipIndex(index);
683 // Update the event stream with the current operation.
684 logEvent(ChampEvent.builder()
685 .operation(ChampOperation.STORE)
691 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
693 // Retrieve the index that we are deleting before it's gone, so that we can
694 // report it to the event stream.
695 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
697 executeDeleteRelationshipIndex(indexName);
699 if(indexToDelete.isPresent()) {
700 // Update the event stream with the current operation.
701 logEvent(ChampEvent.builder()
702 .operation(ChampOperation.DELETE)
703 .entity(indexToDelete.get())
708 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
710 if(!transaction.isPresent()) {
711 // Update the event stream with the current operation.
715 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
716 event.setDbTransactionId ( transaction.get ().id () );
717 transaction.get().logEvent(event);
722 * Submits an event to be published to the event stream.
724 * @param anEvent - The event to be published.
726 public void logEvent(ChampEvent anEvent) {
728 if(eventQueue == null) {
732 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
733 if(logger.isDebugEnabled()) {
734 logger.debug("Event payload: " + anEvent.toString());
737 // Try to submit the event to be published to the event bus.
738 if(!eventQueue.offer(anEvent)) {
739 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
745 * This class implements the worker threads for our thread pool which are responsible for
746 * pulling the next outgoing event from the internal buffer and forwarding them to the event
749 * Each publish operation is performed synchronously, so that the thread will only move on
750 * to the next available event once it has actually published the current event to the bus.
752 private class EventPublisherWorker implements Runnable {
754 /** Partition key to use when publishing events to the event stream. We WANT all events
755 * to go to a single partition, so we are just using a hard-coded key for every event. */
756 private static final String EVENTS_PARTITION_KEY = "champEventKey";
763 ChampEvent event = null;
766 // Get the next event to be published from the queue.
767 event = eventQueue.take();
769 } catch (InterruptedException e) {
771 // Restore the interrupted status.
772 Thread.currentThread().interrupt();
775 // Create new envelope containing an event header and ChampEvent
776 ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
778 // Try publishing the event to the event bus. This call will block until
780 producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
782 } catch (Exception e) {
784 logger.error("Failed to publish event to event bus: " + e.getMessage());