2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampCoreMsgs;
38 import org.onap.aai.champcore.ChampGraph;
39 import org.onap.aai.champcore.ChampTransaction;
40 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
41 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
42 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
44 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
46 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
47 import org.onap.aai.champcore.exceptions.ChampTransactionException;
48 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
49 import org.onap.aai.champcore.model.ChampObject;
50 import org.onap.aai.champcore.model.ChampObjectConstraint;
51 import org.onap.aai.champcore.model.ChampObjectIndex;
52 import org.onap.aai.champcore.model.ChampPartition;
53 import org.onap.aai.champcore.model.ChampRelationship;
54 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
55 import org.onap.aai.champcore.model.ChampRelationshipIndex;
56 import org.onap.aai.champcore.model.ChampSchema;
57 import org.onap.aai.cl.api.Logger;
58 import org.onap.aai.cl.eelf.LoggerFactory;
59 import org.onap.aai.event.api.EventPublisher;
65 * This class provides the hooks to allow Champ operations to be logged to an event
68 public abstract class AbstractLoggingChampGraph implements ChampGraph {
70 public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
71 public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
72 public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
73 public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
74 public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
75 protected static final String KEY_PROPERTY_NAME = "aai-uuid";
76 protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
78 /** Pool of worker threads that do the work of publishing the events to the event bus. */
79 protected ThreadPoolExecutor publisherPool;
81 /** Client used for publishing events to the event bus. */
82 protected EventPublisher producer;
84 /** Internal queue where outgoing events will be buffered until they can be serviced by
85 * the event publisher worker threads. */
86 protected BlockingQueue<ChampEvent> eventQueue;
88 /** Number of events that can be queued up for publication before we begin dropping
90 private Integer eventQueueCapacity;
92 /** Number of event publisher worker threads. */
93 private Integer eventStreamPublisherPoolSize;
95 private static final Logger logger = LoggerFactory.getInstance().getLogger(AbstractLoggingChampGraph.class);
99 * Create a new instance of the AbstractLoggingChampGraph.
101 * @param properties - Set of configuration properties for this graph instance.
103 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
105 // Extract the necessary parameters from the configuration properties.
106 configure(properties);
108 // Make sure we were passed an event producer as one of our properties, otherwise
109 // there is really nothing more we can do...
110 if(producer == null) {
111 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
112 "No event stream producer was supplied.");
113 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
114 "NOTE!! Champ events will NOT be published to the event stream!");
118 // Create the blocking queue that we will use to buffer events that we want
119 // published to the event bus.
120 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
122 // Create the executor pool that will do the work of publishing events to the event bus.
124 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
125 new ProducerWorkerThreadFactory());
129 // Start up the producer worker threads.
130 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
131 publisherPool.submit(new EventPublisherWorker());
134 } catch (Exception e) {
136 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
137 "Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
138 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
139 "NOTE!! Champ events may NOT be published to the event stream!");
146 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
147 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
148 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
149 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
151 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
153 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
154 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
155 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
156 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
158 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
161 * Creates or updates a vertex in the graph data store.
163 * If a transaction context is not provided, then a transaction will be automatically
164 * created and committed for this operation only, otherwise, the supplied transaction
165 * will be used and it will be up to the caller to commit the transaction at its
168 * @param object - The vertex to be created or updated.
169 * @param transaction - Optional transaction context to perform the operation in.
171 * @return - The vertex, as created, marshaled as a {@link ChampObject}
173 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
174 * into the backend representation
175 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
176 * by {@link ChampGraph#retrieveSchema}
177 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
178 * is not present or object not found in the graph
179 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
181 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
184 * Updates an existing vertex in the graph store.
186 * If a transaction context is not provided, then a transaction will be automatically
187 * created and committed for this operation only, otherwise, the supplied transaction
188 * will be used and it will be up to the caller to commit the transaction at its
191 * @param object - The vertex to be created or updated.
192 * @param transaction - Optional transaction context to perform the operation in.
194 * @return - The updated vertex, marshaled as a {@link ChampObject}
196 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
197 * the backend representation
198 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
199 * by {@link ChampGraph#retrieveSchema}
200 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
201 * is not present or object not found in the graph
202 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
204 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
207 * Deletes an existing vertex from the graph store.
209 * If a transaction context is not provided, then a transaction will be automatically
210 * created and committed for this operation only, otherwise, the supplied transaction
211 * will be used and it will be up to the caller to commit the transaction at its
214 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
215 * @param transaction - Optional transaction context to perform the operation in.
217 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
218 * is not present or object not found in the graph
219 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
221 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
224 * Creates or updates an edge in the graph data store.
226 * If a transaction context is not provided, then a transaction will be automatically
227 * created and committed for this operation only, otherwise, the supplied transaction
228 * will be used and it will be up to the caller to commit the transaction at its
231 * @param relationship - The ChampRelationship that you wish to store in the graph
232 * @param transaction - Optional transaction context to perform the operation in.
234 * @return - The {@link ChampRelationship} as it was stored.
236 * @throws ChampUnmarshallingException - If the edge which was created could not be
237 * unmarshalled into a ChampRelationship
238 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
239 * marshalled into the backend representation
240 * @throws ChampObjectNotExistsException - If either the source or target object referenced
241 * by this relationship does not exist in the graph
242 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
243 * specifed by {@link ChampGraph#retrieveSchema}
244 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
245 * but the object cannot be found in the graph
246 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
248 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
251 * Replaces an existing edge in the graph data store.
253 * If a transaction context is not provided, then a transaction will be automatically
254 * created and committed for this operation only, otherwise, the supplied transaction
255 * will be used and it will be up to the caller to commit the transaction at its
258 * @param relationship - The ChampRelationship that you wish to replace in the graph
259 * @param transaction - Optional transaction context to perform the operation in.
261 * @return - The {@link ChampRelationship} as it was stored.
263 * @throws ChampUnmarshallingException - If the edge which was created could not be
264 * unmarshalled into a ChampRelationship
265 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
266 * marshalled into the backend representation
267 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
268 * specifed by {@link ChampGraph#retrieveSchema}
269 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
270 * but the object cannot be found in the graph
271 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
273 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
276 * Removes an edge from the graph data store.
278 * If a transaction context is not provided, then a transaction will be automatically
279 * created and committed for this operation only, otherwise, the supplied transaction
280 * will be used and it will be up to the caller to commit the transaction at its
283 * @param relationship - The ChampRelationship that you wish to remove from the graph.
284 * @param transaction - Optional transaction context to perform the operation in.
286 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
287 * but the object cannot be found in the graph
288 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
290 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
293 * Create or update a {@link ChampPartition}.
295 * If a transaction context is not provided, then a transaction will be automatically
296 * created and committed for this operation only, otherwise, the supplied transaction
297 * will be used and it will be up to the caller to commit the transaction at its
300 * @param partition - The ChampPartition that you wish to create or update in the graph.
301 * @param transaction - Optional transaction context to perform the operation in.
303 * @return - The {@link ChampPartition} as it was stored.
305 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
306 * specifed by {@link ChampGraph#retrieveSchema}
307 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
308 * but the object cannot be found in the graph
309 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
310 * marshalled into the backend representation
311 * @throws ChampObjectNotExistsException - If either the source or target object referenced
312 * by this relationship does not exist in the graph
313 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
315 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
318 * Removes a partition from the graph.
320 * If a transaction context is not provided, then a transaction will be automatically
321 * created and committed for this operation only, otherwise, the supplied transaction
322 * will be used and it will be up to the caller to commit the transaction at its
325 * @param graph - The partition to be removed.
326 * @param transaction - Optional transaction context to perform the operation in.
328 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
330 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
333 * Create or update an object index in the graph.
335 * @param index - The object index to be created/updated.
337 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
338 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
339 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
340 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
341 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
342 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
343 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
344 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
345 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
346 public abstract ChampSchema retrieveSchema();
347 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
348 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
349 public abstract void deleteSchema();
350 public abstract ChampCapabilities capabilities();
354 * Thread factory for the event producer workers.
356 private class ProducerWorkerThreadFactory implements ThreadFactory {
358 private AtomicInteger threadNumber = new AtomicInteger(1);
360 public Thread newThread(Runnable r) {
361 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
367 * Process the configuration properties supplied for this graph instance.
369 * @param properties - Configuration parameters.
371 private void configure(Map<String, Object> properties) {
373 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
376 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
377 eventStreamPublisherPoolSize =
378 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
382 public void setProducer(EventPublisher aProducer) {
384 producer = aProducer;
387 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
389 if(properties.containsKey(property)) {
390 return properties.get(property);
397 public void shutdown() {
399 if(publisherPool != null) {
400 publisherPool.shutdown();
403 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
404 } catch (InterruptedException e) {
405 logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN,
406 "Termination interrupted");
407 Thread.currentThread().interrupt();
411 if(producer != null) {
416 } catch (Exception e) {
417 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
418 "Failed to stop event stream producer: " + e.getMessage());
424 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
428 // Commit the transaction.
429 transaction.commit();
431 } catch (ChampTransactionException e) {
433 logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN,
434 "Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
436 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
437 for(ChampEvent event : enqueuedEvents) {
439 logger.debug("Graph event " + event.toString() + " not published.");
444 // Now that the transaction has been successfully committed, we need
445 // to log the events that were produced within that transaction's
447 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
448 for(ChampEvent event : enqueuedEvents) {
454 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
456 // Rollback the transaction.
457 transaction.rollback();
461 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
462 return storeObject(object, Optional.empty());
466 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
468 ChampObject storedObject = executeStoreObject(object, transaction);
470 if(storedObject != null) {
472 logOrEnqueueEvent(ChampEvent.builder()
473 .operation(ChampOperation.STORE)
474 .entity(storedObject)
483 public ChampObject replaceObject(ChampObject object)
484 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
486 return replaceObject(object, Optional.empty());
490 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
491 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
493 ChampObject replacedObject = executeReplaceObject(object, transaction);
495 if(replacedObject != null) {
497 logOrEnqueueEvent(ChampEvent.builder()
498 .operation(ChampOperation.REPLACE)
499 .entity(replacedObject)
504 return replacedObject;
508 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
509 deleteObject(key, Optional.empty());
513 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
515 // Retrieve the object that we are deleting before it's gone, so that we can
516 // report it to the event stream.
517 Optional<ChampObject> objectToDelete = Optional.empty();
519 objectToDelete = retrieveObject(key, transaction);
521 } catch (ChampUnmarshallingException e) {
522 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
523 "Unable to generate delete object log: " + e.getMessage());
526 executeDeleteObject(key, transaction);
528 if(objectToDelete.isPresent()) {
529 // Update the event stream with the current operation.
530 logOrEnqueueEvent(ChampEvent.builder()
531 .operation(ChampOperation.DELETE)
532 .entity(objectToDelete.get())
539 public ChampRelationship storeRelationship(ChampRelationship relationship)
540 throws ChampUnmarshallingException,
541 ChampMarshallingException,
542 ChampObjectNotExistsException,
543 ChampSchemaViolationException,
544 ChampRelationshipNotExistsException, ChampTransactionException {
545 return storeRelationship(relationship, Optional.empty());
549 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
550 throws ChampUnmarshallingException,
551 ChampMarshallingException,
552 ChampObjectNotExistsException,
553 ChampSchemaViolationException,
554 ChampRelationshipNotExistsException, ChampTransactionException {
556 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
558 if(storedRelationship != null) {
560 // Update the event stream with the current operation.
561 logOrEnqueueEvent(ChampEvent.builder()
562 .operation(ChampOperation.STORE)
563 .entity(storedRelationship)
568 return storedRelationship;
572 public ChampRelationship replaceRelationship(ChampRelationship relationship)
573 throws ChampUnmarshallingException,
574 ChampMarshallingException,
575 ChampSchemaViolationException,
576 ChampRelationshipNotExistsException, ChampTransactionException {
577 return replaceRelationship(relationship, Optional.empty());
581 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
582 throws ChampUnmarshallingException,
583 ChampMarshallingException,
584 ChampSchemaViolationException,
585 ChampRelationshipNotExistsException, ChampTransactionException {
587 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
589 if(replacedRelationship != null) {
591 // Update the event stream with the current operation.
592 logOrEnqueueEvent(ChampEvent.builder()
593 .operation(ChampOperation.REPLACE)
594 .entity(replacedRelationship)
599 return replacedRelationship;
603 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
604 deleteRelationship(relationship, Optional.empty());
608 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
610 executeDeleteRelationship(relationship, transaction);
612 // Update the event stream with the current operation.
613 logOrEnqueueEvent(ChampEvent.builder()
614 .operation(ChampOperation.DELETE)
615 .entity(relationship)
621 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
622 return storePartition(partition, Optional.empty());
626 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
628 ChampPartition storedPartition = executeStorePartition(partition, transaction);
630 if(storedPartition != null) {
632 // Update the event stream with the current operation.
633 logOrEnqueueEvent(ChampEvent.builder()
634 .operation(ChampOperation.STORE)
635 .entity(storedPartition)
640 return storedPartition;
644 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
645 deletePartition(graph, Optional.empty());
649 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
651 executeDeletePartition(graph, transaction);
653 // Update the event stream with the current operation.
654 logOrEnqueueEvent(ChampEvent.builder()
655 .operation(ChampOperation.DELETE)
662 public void storeObjectIndex(ChampObjectIndex index) {
664 executeStoreObjectIndex(index);
666 // Update the event stream with the current operation.
667 logEvent(ChampEvent.builder()
668 .operation(ChampOperation.STORE)
674 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
676 // Retrieve the index that we are deleting before it's gone, so that we can
677 // report it to the event stream.
678 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
680 executeDeleteObjectIndex(indexName);
682 if(indexToDelete.isPresent()) {
683 // Update the event stream with the current operation.
684 logEvent(ChampEvent.builder()
685 .operation(ChampOperation.DELETE)
686 .entity(indexToDelete.get())
692 public void storeRelationshipIndex(ChampRelationshipIndex index) {
694 executeStoreRelationshipIndex(index);
696 // Update the event stream with the current operation.
697 logEvent(ChampEvent.builder()
698 .operation(ChampOperation.STORE)
704 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
706 // Retrieve the index that we are deleting before it's gone, so that we can
707 // report it to the event stream.
708 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
710 executeDeleteRelationshipIndex(indexName);
712 if(indexToDelete.isPresent()) {
713 // Update the event stream with the current operation.
714 logEvent(ChampEvent.builder()
715 .operation(ChampOperation.DELETE)
716 .entity(indexToDelete.get())
721 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
723 if(!transaction.isPresent()) {
724 // Update the event stream with the current operation.
728 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
729 event.setDbTransactionId ( transaction.get ().id () );
730 transaction.get().logEvent(event);
735 * Submits an event to be published to the event stream.
737 * @param anEvent - The event to be published.
739 public void logEvent(ChampEvent anEvent) {
741 if(eventQueue == null) {
745 logger.info(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_INFO,
746 "Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
747 if(logger.isDebugEnabled()) {
748 logger.debug("Event payload: " + anEvent.toString());
751 // Try to submit the event to be published to the event bus.
752 if(!eventQueue.offer(anEvent)) {
753 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
754 "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
760 * This class implements the worker threads for our thread pool which are responsible for
761 * pulling the next outgoing event from the internal buffer and forwarding them to the event
764 * Each publish operation is performed synchronously, so that the thread will only move on
765 * to the next available event once it has actually published the current event to the bus.
767 private class EventPublisherWorker implements Runnable {
769 /** Partition key to use when publishing events to the event stream. We WANT all events
770 * to go to a single partition, so we are just using a hard-coded key for every event. */
771 private static final String EVENTS_PARTITION_KEY = "champEventKey";
778 ChampEvent event = null;
781 // Get the next event to be published from the queue.
782 event = eventQueue.take();
784 } catch (InterruptedException e) {
786 // Restore the interrupted status.
787 Thread.currentThread().interrupt();
790 // Create new envelope containing an event header and ChampEvent
791 ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
793 // Try publishing the event to the event bus. This call will block until
795 producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
797 } catch (Exception e) {
799 logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
800 "Failed to publish event to event bus: " + e.getMessage());