2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
24 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.stream.Stream;
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampGraph;
38 import org.onap.aai.champcore.ChampTransaction;
39 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
40 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
42 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
44 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
45 import org.onap.aai.champcore.exceptions.ChampTransactionException;
46 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
47 import org.onap.aai.champcore.model.ChampObject;
48 import org.onap.aai.champcore.model.ChampObjectConstraint;
49 import org.onap.aai.champcore.model.ChampObjectIndex;
50 import org.onap.aai.champcore.model.ChampPartition;
51 import org.onap.aai.champcore.model.ChampRelationship;
52 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
53 import org.onap.aai.champcore.model.ChampRelationshipIndex;
54 import org.onap.aai.champcore.model.ChampSchema;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import org.onap.aai.event.api.EventPublisher;
63 * This class provides the hooks to allow Champ operations to be logged to an event
66 public abstract class AbstractLoggingChampGraph implements ChampGraph {
68 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
70 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
71 public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
72 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams) throws ChampTransactionException;
73 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
75 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException, ChampTransactionException;
77 public abstract Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
78 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
79 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
80 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
81 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
85 * Creates or updates a vertex in the graph data store.
87 * If a transaction context is not provided, then a transaction will be automatically
88 * created and committed for this operation only, otherwise, the supplied transaction
89 * will be used and it will be up to the caller to commit the transaction at its
92 * @param object - The vertex to be created or updated.
93 * @param transaction - Optional transaction context to perform the operation in.
95 * @return - The vertex, as created, marshaled as a {@link ChampObject}
97 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled
98 * into the backend representation
99 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
100 * by {@link ChampGraph#retrieveSchema}
101 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
102 * is not present or object not found in the graph
103 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
105 public abstract ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
108 * Updates an existing vertex in the graph store.
110 * If a transaction context is not provided, then a transaction will be automatically
111 * created and committed for this operation only, otherwise, the supplied transaction
112 * will be used and it will be up to the caller to commit the transaction at its
115 * @param object - The vertex to be created or updated.
116 * @param transaction - Optional transaction context to perform the operation in.
118 * @return - The updated vertex, marshaled as a {@link ChampObject}
120 * @throws ChampMarshallingException - If the {@code object} is not able to be marshalled into
121 * the backend representation
122 * @throws ChampSchemaViolationException - If the {@code object} violates the constraints specifed
123 * by {@link ChampGraph#retrieveSchema}
124 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
125 * is not present or object not found in the graph
126 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
128 public abstract ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException;
131 * Deletes an existing vertex from the graph store.
133 * If a transaction context is not provided, then a transaction will be automatically
134 * created and committed for this operation only, otherwise, the supplied transaction
135 * will be used and it will be up to the caller to commit the transaction at its
138 * @param key - The key of the ChampObject in the graph {@link ChampObject#getKey}
139 * @param transaction - Optional transaction context to perform the operation in.
141 * @throws ChampObjectNotExistsException - If {@link org.onap.aai.champcore.model.ChampObject#getKey}
142 * is not present or object not found in the graph
143 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
145 public abstract void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException;
148 * Creates or updates an edge in the graph data store.
150 * If a transaction context is not provided, then a transaction will be automatically
151 * created and committed for this operation only, otherwise, the supplied transaction
152 * will be used and it will be up to the caller to commit the transaction at its
155 * @param relationship - The ChampRelationship that you wish to store in the graph
156 * @param transaction - Optional transaction context to perform the operation in.
158 * @return - The {@link ChampRelationship} as it was stored.
160 * @throws ChampUnmarshallingException - If the edge which was created could not be
161 * unmarshalled into a ChampRelationship
162 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
163 * marshalled into the backend representation
164 * @throws ChampObjectNotExistsException - If either the source or target object referenced
165 * by this relationship does not exist in the graph
166 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
167 * specifed by {@link ChampGraph#retrieveSchema}
168 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
169 * but the object cannot be found in the graph
170 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
172 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
175 * Replaces an existing edge in the graph data store.
177 * If a transaction context is not provided, then a transaction will be automatically
178 * created and committed for this operation only, otherwise, the supplied transaction
179 * will be used and it will be up to the caller to commit the transaction at its
182 * @param relationship - The ChampRelationship that you wish to replace in the graph
183 * @param transaction - Optional transaction context to perform the operation in.
185 * @return - The {@link ChampRelationship} as it was stored.
187 * @throws ChampUnmarshallingException - If the edge which was created could not be
188 * unmarshalled into a ChampRelationship
189 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
190 * marshalled into the backend representation
191 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
192 * specifed by {@link ChampGraph#retrieveSchema}
193 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
194 * but the object cannot be found in the graph
195 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
197 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampTransactionException;
200 * Removes an edge from the graph data store.
202 * If a transaction context is not provided, then a transaction will be automatically
203 * created and committed for this operation only, otherwise, the supplied transaction
204 * will be used and it will be up to the caller to commit the transaction at its
207 * @param relationship - The ChampRelationship that you wish to remove from the graph.
208 * @param transaction - Optional transaction context to perform the operation in.
210 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
211 * but the object cannot be found in the graph
212 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
214 public abstract void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException;
217 * Create or update a {@link ChampPartition}.
219 * If a transaction context is not provided, then a transaction will be automatically
220 * created and committed for this operation only, otherwise, the supplied transaction
221 * will be used and it will be up to the caller to commit the transaction at its
224 * @param partition - The ChampPartition that you wish to create or update in the graph.
225 * @param transaction - Optional transaction context to perform the operation in.
227 * @return - The {@link ChampPartition} as it was stored.
229 * @throws ChampSchemaViolationException - If the {@code relationship} violates the constraints
230 * specifed by {@link ChampGraph#retrieveSchema}
231 * @throws ChampRelationshipNotExistsException - If {@link org.onap.aai.champcore.model.ChampRelationship#getKey}.isPresent()
232 * but the object cannot be found in the graph
233 * @throws ChampMarshallingException - If the {@code relationship} is not able to be
234 * marshalled into the backend representation
235 * @throws ChampObjectNotExistsException - If either the source or target object referenced
236 * by this relationship does not exist in the graph
237 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
239 public abstract ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException;
242 * Removes a partition from the graph.
244 * If a transaction context is not provided, then a transaction will be automatically
245 * created and committed for this operation only, otherwise, the supplied transaction
246 * will be used and it will be up to the caller to commit the transaction at its
249 * @param graph - The partition to be removed.
250 * @param transaction - Optional transaction context to perform the operation in.
252 * @throws ChampTransactionException - If an attempt to commit or rollback the transaction failed.
254 public abstract void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException;
257 * Create or update an object index in the graph.
259 * @param index - The object index to be created/updated.
261 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
263 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
264 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
265 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
266 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
267 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
268 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
269 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
270 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
271 public abstract ChampSchema retrieveSchema();
272 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
273 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
274 public abstract void deleteSchema();
275 public abstract ChampCapabilities capabilities();
279 public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
280 public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
282 public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
283 public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
285 public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
289 /** Number of events that can be queued up for publication before we begin dropping
291 private Integer eventQueueCapacity;
293 /** Number of event publisher worker threads. */
294 private Integer eventStreamPublisherPoolSize;
296 /** Pool of worker threads that do the work of publishing the events to the event bus. */
297 protected ThreadPoolExecutor publisherPool;
299 /** Client used for publishing events to the event bus. */
300 protected EventPublisher producer;
302 /** Internal queue where outgoing events will be buffered until they can be serviced by
303 * the event publisher worker threads. */
304 protected BlockingQueue<ChampEvent> eventQueue;
308 * Thread factory for the event producer workers.
310 private class ProducerWorkerThreadFactory implements ThreadFactory {
312 private AtomicInteger threadNumber = new AtomicInteger(1);
314 public Thread newThread(Runnable r) {
315 return new Thread(r, "champEventStreamPublisher-" + threadNumber.getAndIncrement());
321 * Create a new instance of the AbstractLoggingChampGraph.
323 * @param properties - Set of configuration properties for this graph instance.
325 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
327 // Extract the necessary parameters from the configuration properties.
328 configure(properties);
330 // Make sure we were passed an event producer as one of our properties, otherwise
331 // there is really nothing more we can do...
332 if(producer == null) {
333 logger.error("No event stream producer was supplied.");
334 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
338 // Create the blocking queue that we will use to buffer events that we want
339 // published to the event bus.
340 eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
342 // Create the executor pool that will do the work of publishing events to the event bus.
344 (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
345 new ProducerWorkerThreadFactory());
349 // Start up the producer worker threads.
350 for(int i=0; i<eventStreamPublisherPoolSize; i++) {
351 publisherPool.submit(new EventPublisherWorker());
354 } catch (Exception e) {
356 logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
357 logger.error("NOTE!! Champ events may NOT be published to the event stream!");
364 * Process the configuration properties supplied for this graph instance.
366 * @param properties - Configuration parameters.
368 private void configure(Map<String, Object> properties) {
370 producer = (EventPublisher) properties.get(PARAM_EVENT_STREAM_PRODUCER);
373 (Integer) getProperty(properties, PARAM_EVENT_QUEUE_CAPACITY, DEFAULT_EVENT_QUEUE_CAPACITY);
374 eventStreamPublisherPoolSize =
375 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
379 public void setProducer(EventPublisher aProducer) {
381 producer = aProducer;
384 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
386 if(properties.containsKey(property)) {
387 return properties.get(property);
394 public void shutdown() {
396 if(publisherPool != null) {
397 publisherPool.shutdown();
400 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
401 } catch (InterruptedException e) {}
404 if(producer != null) {
409 } catch (Exception e) {
410 logger.error("Failed to stop event stream producer: " + e.getMessage());
416 public void commitTransaction(ChampTransaction transaction) throws ChampTransactionException {
420 // Commit the transaction.
421 transaction.commit();
423 } catch (ChampTransactionException e) {
425 logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
427 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
428 for(ChampEvent event : enqueuedEvents) {
430 logger.debug("Graph event " + event.toString() + " not published.");
435 // Now that the transaction has been successfully committed, we need
436 // to log the events that were produced within that transaction's
438 List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
439 for(ChampEvent event : enqueuedEvents) {
445 public void rollbackTransaction(ChampTransaction transaction) throws ChampTransactionException {
447 // Rollback the transaction.
448 transaction.rollback();
452 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
453 return storeObject(object, Optional.empty());
457 public ChampObject storeObject(ChampObject object, Optional<ChampTransaction> transaction) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
459 ChampObject storedObject = executeStoreObject(object, transaction);
461 if(storedObject != null) {
463 logOrEnqueueEvent(ChampEvent.builder()
464 .operation(ChampOperation.STORE)
465 .entity(storedObject)
474 public ChampObject replaceObject(ChampObject object)
475 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
477 return replaceObject(object, Optional.empty());
481 public ChampObject replaceObject(ChampObject object, Optional<ChampTransaction> transaction)
482 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException, ChampTransactionException {
484 ChampObject replacedObject = executeReplaceObject(object, transaction);
486 if(replacedObject != null) {
488 logOrEnqueueEvent(ChampEvent.builder()
489 .operation(ChampOperation.REPLACE)
490 .entity(replacedObject)
495 return replacedObject;
499 public void deleteObject(Object key) throws ChampObjectNotExistsException, ChampTransactionException {
500 deleteObject(key, Optional.empty());
504 public void deleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException, ChampTransactionException {
506 // Retrieve the object that we are deleting before it's gone, so that we can
507 // report it to the event stream.
508 Optional<ChampObject> objectToDelete = Optional.empty();
510 objectToDelete = retrieveObject(key, transaction);
512 } catch (ChampUnmarshallingException e) {
513 logger.error("Unable to generate delete object log: " + e.getMessage());
516 executeDeleteObject(key, transaction);
518 if(objectToDelete.isPresent()) {
519 // Update the event stream with the current operation.
520 logOrEnqueueEvent(ChampEvent.builder()
521 .operation(ChampOperation.DELETE)
522 .entity(objectToDelete.get())
529 public ChampRelationship storeRelationship(ChampRelationship relationship)
530 throws ChampUnmarshallingException,
531 ChampMarshallingException,
532 ChampObjectNotExistsException,
533 ChampSchemaViolationException,
534 ChampRelationshipNotExistsException, ChampTransactionException {
535 return storeRelationship(relationship, Optional.empty());
539 public ChampRelationship storeRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
540 throws ChampUnmarshallingException,
541 ChampMarshallingException,
542 ChampObjectNotExistsException,
543 ChampSchemaViolationException,
544 ChampRelationshipNotExistsException, ChampTransactionException {
546 ChampRelationship storedRelationship = executeStoreRelationship(relationship, transaction);
548 if(storedRelationship != null) {
550 // Update the event stream with the current operation.
551 logOrEnqueueEvent(ChampEvent.builder()
552 .operation(ChampOperation.STORE)
553 .entity(storedRelationship)
558 return storedRelationship;
562 public ChampRelationship replaceRelationship(ChampRelationship relationship)
563 throws ChampUnmarshallingException,
564 ChampMarshallingException,
565 ChampSchemaViolationException,
566 ChampRelationshipNotExistsException, ChampTransactionException {
567 return replaceRelationship(relationship, Optional.empty());
571 public ChampRelationship replaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
572 throws ChampUnmarshallingException,
573 ChampMarshallingException,
574 ChampSchemaViolationException,
575 ChampRelationshipNotExistsException, ChampTransactionException {
577 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship, transaction);
579 if(replacedRelationship != null) {
581 // Update the event stream with the current operation.
582 logOrEnqueueEvent(ChampEvent.builder()
583 .operation(ChampOperation.REPLACE)
584 .entity(replacedRelationship)
589 return replacedRelationship;
593 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException, ChampTransactionException {
594 deleteRelationship(relationship, Optional.empty());
598 public void deleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException, ChampTransactionException {
600 executeDeleteRelationship(relationship, transaction);
602 // Update the event stream with the current operation.
603 logOrEnqueueEvent(ChampEvent.builder()
604 .operation(ChampOperation.DELETE)
605 .entity(relationship)
611 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
612 return storePartition(partition, Optional.empty());
616 public ChampPartition storePartition(ChampPartition partition, Optional<ChampTransaction> transaction) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException, ChampTransactionException {
618 ChampPartition storedPartition = executeStorePartition(partition, transaction);
620 if(storedPartition != null) {
622 // Update the event stream with the current operation.
623 logOrEnqueueEvent(ChampEvent.builder()
624 .operation(ChampOperation.STORE)
625 .entity(storedPartition)
630 return storedPartition;
634 public void deletePartition(ChampPartition graph) throws ChampTransactionException{
635 deletePartition(graph, Optional.empty());
639 public void deletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) throws ChampTransactionException {
641 executeDeletePartition(graph, transaction);
643 // Update the event stream with the current operation.
644 logOrEnqueueEvent(ChampEvent.builder()
645 .operation(ChampOperation.DELETE)
652 public void storeObjectIndex(ChampObjectIndex index) {
654 executeStoreObjectIndex(index);
656 // Update the event stream with the current operation.
657 logEvent(ChampEvent.builder()
658 .operation(ChampOperation.STORE)
664 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
666 // Retrieve the index that we are deleting before it's gone, so that we can
667 // report it to the event stream.
668 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
670 executeDeleteObjectIndex(indexName);
672 if(indexToDelete.isPresent()) {
673 // Update the event stream with the current operation.
674 logEvent(ChampEvent.builder()
675 .operation(ChampOperation.DELETE)
676 .entity(indexToDelete.get())
682 public void storeRelationshipIndex(ChampRelationshipIndex index) {
684 executeStoreRelationshipIndex(index);
686 // Update the event stream with the current operation.
687 logEvent(ChampEvent.builder()
688 .operation(ChampOperation.STORE)
694 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
696 // Retrieve the index that we are deleting before it's gone, so that we can
697 // report it to the event stream.
698 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
700 executeDeleteRelationshipIndex(indexName);
702 if(indexToDelete.isPresent()) {
703 // Update the event stream with the current operation.
704 logEvent(ChampEvent.builder()
705 .operation(ChampOperation.DELETE)
706 .entity(indexToDelete.get())
711 private void logOrEnqueueEvent(ChampEvent event, Optional<ChampTransaction> transaction) {
713 if(!transaction.isPresent()) {
714 // Update the event stream with the current operation.
718 // when the TransactionID is present, add it to the event payload before logging/enqueing the event.
719 event.setDbTransactionId ( transaction.get ().id () );
720 transaction.get().logEvent(event);
725 * Submits an event to be published to the event stream.
727 * @param anEvent - The event to be published.
729 public void logEvent(ChampEvent anEvent) {
731 if(eventQueue == null) {
735 logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
736 if(logger.isDebugEnabled()) {
737 logger.debug("Event payload: " + anEvent.toString());
740 // Try to submit the event to be published to the event bus.
741 if(!eventQueue.offer(anEvent)) {
742 logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
748 * This class implements the worker threads for our thread pool which are responsible for
749 * pulling the next outgoing event from the internal buffer and forwarding them to the event
752 * Each publish operation is performed synchronously, so that the thread will only move on
753 * to the next available event once it has actually published the current event to the bus.
755 private class EventPublisherWorker implements Runnable {
757 /** Partition key to use when publishing events to the event stream. We WANT all events
758 * to go to a single partition, so we are just using a hard-coded key for every event. */
759 private static final String EVENTS_PARTITION_KEY = "champEventKey";
767 ChampEvent event = null;
770 // Get the next event to be published from the queue.
771 event = eventQueue.take();
773 } catch (InterruptedException e) {
775 // Restore the interrupted status.
776 Thread.currentThread().interrupt();
779 // Try publishing the event to the event bus. This call will block
782 producer.sendSync(EVENTS_PARTITION_KEY, event.toJson());
784 } catch (Exception e) {
786 logger.error("Failed to publish event to event bus: " + e.getMessage());