2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
35 import org.onap.aai.champcore.ChampCapabilities;
36 import org.onap.aai.champcore.ChampGraph;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
39 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
40 import org.onap.aai.champcore.event.envelope.ChampEventHeader;
41 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
43 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
45 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
46 import org.onap.aai.champcore.exceptions.ChampTransactionException;
47 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
48 import org.onap.aai.champcore.model.ChampObject;
49 import org.onap.aai.champcore.model.ChampObjectConstraint;
50 import org.onap.aai.champcore.model.ChampObjectIndex;
51 import org.onap.aai.champcore.model.ChampPartition;
52 import org.onap.aai.champcore.model.ChampRelationship;
53 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
54 import org.onap.aai.champcore.model.ChampRelationshipIndex;
55 import org.onap.aai.champcore.model.ChampSchema;
56 import org.onap.aai.event.api.EventPublisher;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
63 * This class provides the hooks to allow Champ operations to be logged to an event
66 public abstract class AbstractLoggingChampGraph implements ChampGraph {
68 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
70 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
71 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
72 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
73 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
75 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
77 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
78 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
79 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
81 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
85 * Creates or updates a vertex in the graph data store.
87 * If a transaction context is not provided, then a transaction will be automatically
88 * created and committed for this operation only, otherwise, the supplied transaction
89 * will be used and it will be up to the caller to commit the transaction at its
92 * @param object - The vertex to be created or updated.
93 * @param transaction - Optional transaction context to perform the operation in.
95 * @return - The vertex, as created, marshaled as a {@link ChampObject}
97 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
98 * into the backend representation
99 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
100 * by {@link ChampGraph#retrieveSchema}
101 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
102 * is not present or object not found in the graph
103 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
105 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
108 * Updates an existing vertex in the graph store.
110 * If a transaction context is not provided, then a transaction will be automatically
111 * created and committed for this operation only, otherwise, the supplied transaction
112 * will be used and it will be up to the caller to commit the transaction at its
115 * @param object - The vertex to be created or updated.
116 * @param transaction - Optional transaction context to perform the operation in.
118 * @return - The updated vertex, marshaled as a {@link ChampObject}
120 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
121 * the backend representation
122 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
123 * by {@link ChampGraph#retrieveSchema}
124 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
125 * is not present or object not found in the graph
126 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
128 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
131 * Deletes an existing vertex from the graph store.
133 * If a transaction context is not provided, then a transaction will be automatically
134 * created and committed for this operation only, otherwise, the supplied transaction
135 * will be used and it will be up to the caller to commit the transaction at its
138 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
139 * @param transaction - Optional transaction context to perform the operation in.
141 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
142 * is not present or object not found in the graph
143 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
145 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
148 * Creates or updates an edge in the graph data store.
150 * If a transaction context is not provided, then a transaction will be automatically
151 * created and committed for this operation only, otherwise, the supplied transaction
152 * will be used and it will be up to the caller to commit the transaction at its
155 * @param relationship - The ChampRelationship that you wish to store in the graph
156 * @param transaction - Optional transaction context to perform the operation in.
158 * @return - The {@link ChampRelationship} as it was stored.
160 * @throws ChampUnmarshallingException - If the edge which was created could not be
161 * unmarshalled into a ChampRelationship
162 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
163 * marshalled into the backend representation
164 * @throws ChampObjectNotExistsException - If either the source or target object referenced
165 * by this relationship does not exist in the graph
166 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
167 * specifed by {@link ChampGraph#retrieveSchema}
168 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
169 * but the object cannot be found in the graph
170 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
172 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
175 * Replaces an existing edge in the graph data store.
177 * If a transaction context is not provided, then a transaction will be automatically
178 * created and committed for this operation only, otherwise, the supplied transaction
179 * will be used and it will be up to the caller to commit the transaction at its
182 * @param relationship - The ChampRelationship that you wish to replace in the graph
183 * @param transaction - Optional transaction context to perform the operation in.
185 * @return - The {@link ChampRelationship} as it was stored.
187 * @throws ChampUnmarshallingException - If the edge which was created could not be
188 * unmarshalled into a ChampRelationship
189 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
190 * marshalled into the backend representation
191 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
192 * specifed by {@link ChampGraph#retrieveSchema}
193 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
194 * but the object cannot be found in the graph
195 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
197 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
200 * Removes an edge from the graph data store.
202 * If a transaction context is not provided, then a transaction will be automatically
203 * created and committed for this operation only, otherwise, the supplied transaction
204 * will be used and it will be up to the caller to commit the transaction at its
207 * @param relationship - The ChampRelationship that you wish to remove from the graph.
208 * @param transaction - Optional transaction context to perform the operation in.
210 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
211 * but the object cannot be found in the graph
212 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
214 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
217 * Create or update a {@link ChampPartition}.
219 * If a transaction context is not provided, then a transaction will be automatically
220 * created and committed for this operation only, otherwise, the supplied transaction
221 * will be used and it will be up to the caller to commit the transaction at its
224 * @param partition - The ChampPartition that you wish to create or update in the graph.
225 * @param transaction - Optional transaction context to perform the operation in.
227 * @return - The {@link ChampPartition} as it was stored.
229 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
230 * specifed by {@link ChampGraph#retrieveSchema}
231 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
232 * but the object cannot be found in the graph
233 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
234 * marshalled into the backend representation
235 * @throws ChampObjectNotExistsException - If either the source or target object referenced
236 * by this relationship does not exist in the graph
237 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
239 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
242 * Removes a partition from the graph.
244 * If a transaction context is not provided, then a transaction will be automatically
245 * created and committed for this operation only, otherwise, the supplied transaction
246 * will be used and it will be up to the caller to commit the transaction at its
249 * @param graph - The partition to be removed.
250 * @param transaction - Optional transaction context to perform the operation in.
252 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
254 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
257 * Create or update an object index in the graph.
259 * @param index - The object index to be created/updated.
261 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
263 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
264 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
265 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
266 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
267 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
268 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
269 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
270 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
271 public abstract ChampSchema retrieveSchema();
272 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
273 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
274 public abstract void deleteSchema();
275 public abstract ChampCapabilities capabilities();
279 public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
280 public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
282 public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
283 public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
285 public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
289 /** Number of events that can be queued up for publication before we begin dropping
291 private Integer eventQueueCapacity;
293 /** Number of event publisher worker threads. */
294 private Integer eventStreamPublisherPoolSize;
296 /** Pool of worker threads that do the work of publishing the events to the event bus. */
297 protected ThreadPoolExecutor publisherPool;
299 /** Client used for publishing events to the event bus. */
300 protected EventPublisher producer;
302 /** Internal queue where outgoing events will be buffered until they can be serviced by
303 * the event publisher worker threads. */
304 protected BlockingQueue<ChampEvent> eventQueue;
308 * Thread factory for the event producer workers.
310 private class ProducerWorkerThreadFactory implements ThreadFactory {
312 private AtomicInteger threadNumber = new AtomicInteger(1);
314 public Thread newThread(Runnable r) {
315 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
321 * Create a new instance of the AbstractLoggingChampGraph.
323 * @param properties - Set of configuration properties for this graph instance.
325 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
327 // Extract the necessary parameters from the configuration properties.
328 configure(properties);
330 // Make sure we were passed an event producer as one of our properties, otherwise
331 // there is really nothing more we can do...
332 if(producer == null) {
333 logger.error("No event stream producer was supplied.");
334 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
338 // Create the blocking queue that we will use to buffer events that we want
339 // published to the event bus.
340 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
342 // Create the executor pool that will do the work of publishing events to the event bus.
344 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
345 new ProducerWorkerThreadFactory());
349 // Start up the producer worker threads.
350 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
351 publisherPool.submit(new EventPublisherWorker());
354 } catch (Exception e) {
356 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
357 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
364 * Process the configuration properties supplied for this graph instance.
366 * @param properties - Configuration parameters.
368 private void configure(Map<String, Object> properties) {
370 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
373 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
374 eventStreamPublisherPoolSize =
375 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
379 public void setProducer(EventPublisher aProducer) {
381 producer = aProducer;
384 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
386 if(properties.containsKey(property)) {
387 return properties.get(property);
394 public void shutdown() {
396 if(publisherPool != null) {
397 publisherPool.shutdown();
400 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
401 } catch (InterruptedException e) {
402 logger.warn("Termination interrupted");
403 Thread.currentThread().interrupt();
407 if(producer != null) {
412 } catch (Exception e) {
413 logger.error("Failed to stop event stream producer: " + e.getMessage());
419 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
423 // Commit the transaction.
424 transaction.commit();
426 } catch (ChampTransactionException e) {
428 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
430 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
431 for(ChampEvent event : enqueuedEvents) {
433 logger.debug("Graph event " + event.toString() + " not published.");
438 // Now that the transaction has been successfully committed, we need
439 // to log the events that were produced within that transaction's
441 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
442 for(ChampEvent event : enqueuedEvents) {
448 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
450 // Rollback the transaction.
451 transaction.rollback();
455 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
456 return storeObject(object, Optional.empty());
460 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
462 ChampObject storedObject = executeStoreObject(object, transaction);
464 if(storedObject != null) {
466 logOrEnqueueEvent(ChampEvent.builder()
467 .operation(ChampOperation.STORE)
468 .entity(storedObject)
477 public ChampObject replaceObject(ChampObject object)
478 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
480 return replaceObject(object, Optional.empty());
484 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
485 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
487 ChampObject replacedObject = executeReplaceObject(object, transaction);
489 if(replacedObject != null) {
491 logOrEnqueueEvent(ChampEvent.builder()
492 .operation(ChampOperation.REPLACE)
493 .entity(replacedObject)
498 return replacedObject;
502 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
503 deleteObject(key, Optional.empty());
507 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
509 // Retrieve the object that we are deleting before it's gone, so that we can
510 // report it to the event stream.
511 Optional<ChampObject> objectToDelete = Optional.empty();
513 objectToDelete = retrieveObject(key, transaction);
515 } catch (ChampUnmarshallingException e) {
516 logger.error("Unable to generate delete object log: " + e.getMessage());
519 executeDeleteObject(key, transaction);
521 if(objectToDelete.isPresent()) {
522 // Update the event stream with the current operation.
523 logOrEnqueueEvent(ChampEvent.builder()
524 .operation(ChampOperation.DELETE)
525 .entity(objectToDelete.get())
532 public ChampRelationship storeRelationship(ChampRelationship relationship)
533 throws ChampUnmarshallingException,
534 ChampMarshallingException,
535 ChampObjectNotExistsException,
536 ChampSchemaViolationException,
537 ChampRelationshipNotExistsException, ChampTransactionException {
538 return storeRelationship(relationship, Optional.empty());
542 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
543 throws ChampUnmarshallingException,
544 ChampMarshallingException,
545 ChampObjectNotExistsException,
546 ChampSchemaViolationException,
547 ChampRelationshipNotExistsException, ChampTransactionException {
549 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
551 if(storedRelationship != null) {
553 // Update the event stream with the current operation.
554 logOrEnqueueEvent(ChampEvent.builder()
555 .operation(ChampOperation.STORE)
556 .entity(storedRelationship)
561 return storedRelationship;
565 public ChampRelationship replaceRelationship(ChampRelationship relationship)
566 throws ChampUnmarshallingException,
567 ChampMarshallingException,
568 ChampSchemaViolationException,
569 ChampRelationshipNotExistsException, ChampTransactionException {
570 return replaceRelationship(relationship, Optional.empty());
574 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
575 throws ChampUnmarshallingException,
576 ChampMarshallingException,
577 ChampSchemaViolationException,
578 ChampRelationshipNotExistsException, ChampTransactionException {
580 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
582 if(replacedRelationship != null) {
584 // Update the event stream with the current operation.
585 logOrEnqueueEvent(ChampEvent.builder()
586 .operation(ChampOperation.REPLACE)
587 .entity(replacedRelationship)
592 return replacedRelationship;
596 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
597 deleteRelationship(relationship, Optional.empty());
601 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
603 executeDeleteRelationship(relationship, transaction);
605 // Update the event stream with the current operation.
606 logOrEnqueueEvent(ChampEvent.builder()
607 .operation(ChampOperation.DELETE)
608 .entity(relationship)
614 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
615 return storePartition(partition, Optional.empty());
619 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
621 ChampPartition storedPartition = executeStorePartition(partition, transaction);
623 if(storedPartition != null) {
625 // Update the event stream with the current operation.
626 logOrEnqueueEvent(ChampEvent.builder()
627 .operation(ChampOperation.STORE)
628 .entity(storedPartition)
633 return storedPartition;
637 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
638 deletePartition(graph, Optional.empty());
642 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
644 executeDeletePartition(graph, transaction);
646 // Update the event stream with the current operation.
647 logOrEnqueueEvent(ChampEvent.builder()
648 .operation(ChampOperation.DELETE)
655 public void storeObjectIndex(ChampObjectIndex index) {
657 executeStoreObjectIndex(index);
659 // Update the event stream with the current operation.
660 logEvent(ChampEvent.builder()
661 .operation(ChampOperation.STORE)
667 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
669 // Retrieve the index that we are deleting before it's gone, so that we can
670 // report it to the event stream.
671 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
673 executeDeleteObjectIndex(indexName);
675 if(indexToDelete.isPresent()) {
676 // Update the event stream with the current operation.
677 logEvent(ChampEvent.builder()
678 .operation(ChampOperation.DELETE)
679 .entity(indexToDelete.get())
685 public void storeRelationshipIndex(ChampRelationshipIndex index) {
687 executeStoreRelationshipIndex(index);
689 // Update the event stream with the current operation.
690 logEvent(ChampEvent.builder()
691 .operation(ChampOperation.STORE)
697 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
699 // Retrieve the index that we are deleting before it's gone, so that we can
700 // report it to the event stream.
701 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
703 executeDeleteRelationshipIndex(indexName);
705 if(indexToDelete.isPresent()) {
706 // Update the event stream with the current operation.
707 logEvent(ChampEvent.builder()
708 .operation(ChampOperation.DELETE)
709 .entity(indexToDelete.get())
714 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
716 if(!transaction.isPresent()) {
717 // Update the event stream with the current operation.
721 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
722 event.setDbTransactionId ( transaction.get ().id () );
723 transaction.get().logEvent(event);
728 * Submits an event to be published to the event stream.
730 * @param anEvent - The event to be published.
732 public void logEvent(ChampEvent anEvent) {
734 if(eventQueue == null) {
738 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
739 if(logger.isDebugEnabled()) {
740 logger.debug("Event payload: " + anEvent.toString());
743 // Try to submit the event to be published to the event bus.
744 if(!eventQueue.offer(anEvent)) {
745 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
751 * This class implements the worker threads for our thread pool which are responsible for
752 * pulling the next outgoing event from the internal buffer and forwarding them to the event
755 * Each publish operation is performed synchronously, so that the thread will only move on
756 * to the next available event once it has actually published the current event to the bus.
758 private class EventPublisherWorker implements Runnable {
760 /** Partition key to use when publishing events to the event stream. We WANT all events
761 * to go to a single partition, so we are just using a hard-coded key for every event. */
762 private static final String EVENTS_PARTITION_KEY = "champEventKey";
769 ChampEvent event = null;
772 // Get the next event to be published from the queue.
773 event = eventQueue.take();
775 } catch (InterruptedException e) {
777 // Restore the interrupted status.
778 Thread.currentThread().interrupt();
781 // Create new envelope containing an event header and ChampEvent
782 ChampEventEnvelope eventEnvelope = new ChampEventEnvelope(event);
784 // Try publishing the event to the event bus. This call will block until
786 producer.sendSync(EVENTS_PARTITION_KEY, eventEnvelope.toJson());
788 } catch (Exception e) {
790 logger.error("Failed to publish event to event bus: " + e.getMessage());