2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
62 * This class provides the hooks to allow Champ operations to be logged to an event
65 public abstract class AbstractLoggingChampGraph implements ChampGraph {
67 public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
68 public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
69 public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
70 public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
71 public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
72 protected static final String KEY_PROPERTY_NAME = "aai-uuid";
73 protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
75 /** Pool of worker threads that do the work of publishing the events to the event bus. */
76 protected ThreadPoolExecutor publisherPool;
78 /** Client used for publishing events to the event bus. */
79 protected EventPublisher producer;
81 /** Internal queue where outgoing events will be buffered until they can be serviced by
82 * the event publisher worker threads. */
83 protected BlockingQueue<ChampEvent> eventQueue;
85 /** Number of events that can be queued up for publication before we begin dropping
87 private Integer eventQueueCapacity;
89 /** Number of event publisher worker threads. */
90 private Integer eventStreamPublisherPoolSize;
92 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
96 * Create a new instance of the AbstractLoggingChampGraph.
98 * @param properties - Set of configuration properties for this graph instance.
100 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
102 // Extract the necessary parameters from the configuration properties.
103 configure(properties);
105 // Make sure we were passed an event producer as one of our properties, otherwise
106 // there is really nothing more we can do...
107 if(producer == null) {
108 logger.error("No event stream producer was supplied.");
109 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
113 // Create the blocking queue that we will use to buffer events that we want
114 // published to the event bus.
115 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
117 // Create the executor pool that will do the work of publishing events to the event bus.
119 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
120 new ProducerWorkerThreadFactory());
124 // Start up the producer worker threads.
125 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
126 publisherPool.submit(new EventPublisherWorker());
129 } catch (Exception e) {
131 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
132 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
139 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
140 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
141 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
142 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
144 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
146 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
147 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
148 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
149 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
151 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
154 * Creates or updates a vertex in the graph data store.
156 * If a transaction context is not provided, then a transaction will be automatically
157 * created and committed for this operation only, otherwise, the supplied transaction
158 * will be used and it will be up to the caller to commit the transaction at its
161 * @param object - The vertex to be created or updated.
162 * @param transaction - Optional transaction context to perform the operation in.
164 * @return - The vertex, as created, marshaled as a {@link ChampObject}
166 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
167 * into the backend representation
168 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
169 * by {@link ChampGraph#retrieveSchema}
170 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
171 * is not present or object not found in the graph
172 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
174 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
177 * Updates an existing vertex in the graph store.
179 * If a transaction context is not provided, then a transaction will be automatically
180 * created and committed for this operation only, otherwise, the supplied transaction
181 * will be used and it will be up to the caller to commit the transaction at its
184 * @param object - The vertex to be created or updated.
185 * @param transaction - Optional transaction context to perform the operation in.
187 * @return - The updated vertex, marshaled as a {@link ChampObject}
189 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
190 * the backend representation
191 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
192 * by {@link ChampGraph#retrieveSchema}
193 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
194 * is not present or object not found in the graph
195 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
197 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
200 * Deletes an existing vertex from the graph store.
202 * If a transaction context is not provided, then a transaction will be automatically
203 * created and committed for this operation only, otherwise, the supplied transaction
204 * will be used and it will be up to the caller to commit the transaction at its
207 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
208 * @param transaction - Optional transaction context to perform the operation in.
210 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
211 * is not present or object not found in the graph
212 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
214 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
217 * Creates or updates an edge in the graph data store.
219 * If a transaction context is not provided, then a transaction will be automatically
220 * created and committed for this operation only, otherwise, the supplied transaction
221 * will be used and it will be up to the caller to commit the transaction at its
224 * @param relationship - The ChampRelationship that you wish to store in the graph
225 * @param transaction - Optional transaction context to perform the operation in.
227 * @return - The {@link ChampRelationship} as it was stored.
229 * @throws ChampUnmarshallingException - If the edge which was created could not be
230 * unmarshalled into a ChampRelationship
231 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
232 * marshalled into the backend representation
233 * @throws ChampObjectNotExistsException - If either the source or target object referenced
234 * by this relationship does not exist in the graph
235 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
236 * specifed by {@link ChampGraph#retrieveSchema}
237 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
238 * but the object cannot be found in the graph
239 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
241 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
244 * Replaces an existing edge in the graph data store.
246 * If a transaction context is not provided, then a transaction will be automatically
247 * created and committed for this operation only, otherwise, the supplied transaction
248 * will be used and it will be up to the caller to commit the transaction at its
251 * @param relationship - The ChampRelationship that you wish to replace in the graph
252 * @param transaction - Optional transaction context to perform the operation in.
254 * @return - The {@link ChampRelationship} as it was stored.
256 * @throws ChampUnmarshallingException - If the edge which was created could not be
257 * unmarshalled into a ChampRelationship
258 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
259 * marshalled into the backend representation
260 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
261 * specifed by {@link ChampGraph#retrieveSchema}
262 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
263 * but the object cannot be found in the graph
264 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
266 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
269 * Removes an edge from the graph data store.
271 * If a transaction context is not provided, then a transaction will be automatically
272 * created and committed for this operation only, otherwise, the supplied transaction
273 * will be used and it will be up to the caller to commit the transaction at its
276 * @param relationship - The ChampRelationship that you wish to remove from the graph.
277 * @param transaction - Optional transaction context to perform the operation in.
279 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
280 * but the object cannot be found in the graph
281 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
283 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
286 * Create or update a {@link ChampPartition}.
288 * If a transaction context is not provided, then a transaction will be automatically
289 * created and committed for this operation only, otherwise, the supplied transaction
290 * will be used and it will be up to the caller to commit the transaction at its
293 * @param partition - The ChampPartition that you wish to create or update in the graph.
294 * @param transaction - Optional transaction context to perform the operation in.
296 * @return - The {@link ChampPartition} as it was stored.
298 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
299 * specifed by {@link ChampGraph#retrieveSchema}
300 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
301 * but the object cannot be found in the graph
302 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
303 * marshalled into the backend representation
304 * @throws ChampObjectNotExistsException - If either the source or target object referenced
305 * by this relationship does not exist in the graph
306 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
308 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
311 * Removes a partition from the graph.
313 * If a transaction context is not provided, then a transaction will be automatically
314 * created and committed for this operation only, otherwise, the supplied transaction
315 * will be used and it will be up to the caller to commit the transaction at its
318 * @param graph - The partition to be removed.
319 * @param transaction - Optional transaction context to perform the operation in.
321 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
323 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
326 * Create or update an object index in the graph.
328 * @param index - The object index to be created/updated.
330 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
331 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
332 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
333 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
334 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
335 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
336 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
337 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
338 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
339 public abstract ChampSchema retrieveSchema();
340 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
341 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
342 public abstract void deleteSchema();
343 public abstract ChampCapabilities capabilities();
347 * Thread factory for the event producer workers.
349 private class ProducerWorkerThreadFactory implements ThreadFactory {
351 private AtomicInteger threadNumber = new AtomicInteger(1);
353 public Thread newThread(Runnable r) {
354 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
360 * Process the configuration properties supplied for this graph instance.
362 * @param properties - Configuration parameters.
364 private void configure(Map<String, Object> properties) {
366 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
369 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
370 eventStreamPublisherPoolSize =
371 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
375 public void setProducer(EventPublisher aProducer) {
377 producer = aProducer;
380 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
382 if(properties.containsKey(property)) {
383 return properties.get(property);
390 public void shutdown() {
392 if(publisherPool != null) {
393 publisherPool.shutdown();
396 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
397 } catch (InterruptedException e) {
398 logger.warn("Termination interrupted");
399 Thread.currentThread().interrupt();
403 if(producer != null) {
408 } catch (Exception e) {
409 logger.error("Failed to stop event stream producer: " + e.getMessage());
415 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
419 // Commit the transaction.
420 transaction.commit();
422 } catch (ChampTransactionException e) {
424 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
426 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
427 for(ChampEvent event : enqueuedEvents) {
429 logger.debug("Graph event " + event.toString() + " not published.");
434 // Now that the transaction has been successfully committed, we need
435 // to log the events that were produced within that transaction's
437 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
438 for(ChampEvent event : enqueuedEvents) {
444 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
446 // Rollback the transaction.
447 transaction.rollback();
451 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
452 return storeObject(object, Optional.empty());
456 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
458 ChampObject storedObject = executeStoreObject(object, transaction);
460 if(storedObject != null) {
462 logOrEnqueueEvent(ChampEvent.builder()
463 .operation(ChampOperation.STORE)
464 .entity(storedObject)
473 public ChampObject replaceObject(ChampObject object)
474 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
476 return replaceObject(object, Optional.empty());
480 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
481 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
483 ChampObject replacedObject = executeReplaceObject(object, transaction);
485 if(replacedObject != null) {
487 logOrEnqueueEvent(ChampEvent.builder()
488 .operation(ChampOperation.REPLACE)
489 .entity(replacedObject)
494 return replacedObject;
498 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
499 deleteObject(key, Optional.empty());
503 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
505 // Retrieve the object that we are deleting before it's gone, so that we can
506 // report it to the event stream.
507 Optional<ChampObject> objectToDelete = Optional.empty();
509 objectToDelete = retrieveObject(key, transaction);
511 } catch (ChampUnmarshallingException e) {
512 logger.error("Unable to generate delete object log: " + e.getMessage());
515 executeDeleteObject(key, transaction);
517 if(objectToDelete.isPresent()) {
518 // Update the event stream with the current operation.
519 logOrEnqueueEvent(ChampEvent.builder()
520 .operation(ChampOperation.DELETE)
521 .entity(objectToDelete.get())
528 public ChampRelationship storeRelationship(ChampRelationship relationship)
529 throws ChampUnmarshallingException,
530 ChampMarshallingException,
531 ChampObjectNotExistsException,
532 ChampSchemaViolationException,
533 ChampRelationshipNotExistsException, ChampTransactionException {
534 return storeRelationship(relationship, Optional.empty());
538 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
539 throws ChampUnmarshallingException,
540 ChampMarshallingException,
541 ChampObjectNotExistsException,
542 ChampSchemaViolationException,
543 ChampRelationshipNotExistsException, ChampTransactionException {
545 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
547 if(storedRelationship != null) {
549 // Update the event stream with the current operation.
550 logOrEnqueueEvent(ChampEvent.builder()
551 .operation(ChampOperation.STORE)
552 .entity(storedRelationship)
557 return storedRelationship;
561 public ChampRelationship replaceRelationship(ChampRelationship relationship)
562 throws ChampUnmarshallingException,
563 ChampMarshallingException,
564 ChampSchemaViolationException,
565 ChampRelationshipNotExistsException, ChampTransactionException {
566 return replaceRelationship(relationship, Optional.empty());
570 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
571 throws ChampUnmarshallingException,
572 ChampMarshallingException,
573 ChampSchemaViolationException,
574 ChampRelationshipNotExistsException, ChampTransactionException {
576 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
578 if(replacedRelationship != null) {
580 // Update the event stream with the current operation.
581 logOrEnqueueEvent(ChampEvent.builder()
582 .operation(ChampOperation.REPLACE)
583 .entity(replacedRelationship)
588 return replacedRelationship;
592 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
593 deleteRelationship(relationship, Optional.empty());
597 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
599 executeDeleteRelationship(relationship, transaction);
601 // Update the event stream with the current operation.
602 logOrEnqueueEvent(ChampEvent.builder()
603 .operation(ChampOperation.DELETE)
604 .entity(relationship)
610 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
611 return storePartition(partition, Optional.empty());
615 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
617 ChampPartition storedPartition = executeStorePartition(partition, transaction);
619 if(storedPartition != null) {
621 // Update the event stream with the current operation.
622 logOrEnqueueEvent(ChampEvent.builder()
623 .operation(ChampOperation.STORE)
624 .entity(storedPartition)
629 return storedPartition;
633 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
634 deletePartition(graph, Optional.empty());
638 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
640 executeDeletePartition(graph, transaction);
642 // Update the event stream with the current operation.
643 logOrEnqueueEvent(ChampEvent.builder()
644 .operation(ChampOperation.DELETE)
651 public void storeObjectIndex(ChampObjectIndex index) {
653 executeStoreObjectIndex(index);
655 // Update the event stream with the current operation.
656 logEvent(ChampEvent.builder()
657 .operation(ChampOperation.STORE)
663 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
665 // Retrieve the index that we are deleting before it's gone, so that we can
666 // report it to the event stream.
667 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
669 executeDeleteObjectIndex(indexName);
671 if(indexToDelete.isPresent()) {
672 // Update the event stream with the current operation.
673 logEvent(ChampEvent.builder()
674 .operation(ChampOperation.DELETE)
675 .entity(indexToDelete.get())
681 public void storeRelationshipIndex(ChampRelationshipIndex index) {
683 executeStoreRelationshipIndex(index);
685 // Update the event stream with the current operation.
686 logEvent(ChampEvent.builder()
687 .operation(ChampOperation.STORE)
693 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
695 // Retrieve the index that we are deleting before it's gone, so that we can
696 // report it to the event stream.
697 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
699 executeDeleteRelationshipIndex(indexName);
701 if(indexToDelete.isPresent()) {
702 // Update the event stream with the current operation.
703 logEvent(ChampEvent.builder()
704 .operation(ChampOperation.DELETE)
705 .entity(indexToDelete.get())
710 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
712 if(!transaction.isPresent()) {
713 // Update the event stream with the current operation.
717 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
718 event.setDbTransactionId ( transaction.get ().id () );
719 transaction.get().logEvent(event);
724 * Submits an event to be published to the event stream.
726 * @param anEvent - The event to be published.
728 public void logEvent(ChampEvent anEvent) {
730 if(eventQueue == null) {
734 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
735 if(logger.isDebugEnabled()) {
736 logger.debug("Event payload: " + anEvent.toString());
739 // Try to submit the event to be published to the event bus.
740 if(!eventQueue.offer(anEvent)) {
741 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
747 * This class implements the worker threads for our thread pool which are responsible for
748 * pulling the next outgoing event from the internal buffer and forwarding them to the event
751 * Each publish operation is performed synchronously, so that the thread will only move on
752 * to the next available event once it has actually published the current event to the bus.
754 private class EventPublisherWorker implements Runnable {
756 /** Partition key to use when publishing events to the event stream. We WANT all events
757 * to go to a single partition, so we are just using a hard-coded key for every event. */
758 private static final String EVENTS_PARTITION_KEY = "champEventKey";
765 ChampEvent event = null;
768 // Get the next event to be published from the queue.
769 event = eventQueue.take();
771 } catch (InterruptedException e) {
773 // Restore the interrupted status.
774 Thread.currentThread().interrupt();
777 // Create new envelope containing an event header and ChampEvent
778 ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
780 // Try publishing the event to the event bus. This call will block until
782 producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
784 } catch (Exception e) {
786 logger.error("Failed to publish event to event bus: " + e.getMessage());