+ public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
+ public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+ public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
+ public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
+ public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+ protected static final String KEY_PROPERTY_NAME = "aai-uuid";
+ protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
+
+ /** Pool of worker threads that do the work of publishing the events to the event bus. */
+ protected ThreadPoolExecutor publisherPool;
+
+ /** Client used for publishing events to the event bus. */
+ protected EventPublisher producer;
+
+ /** Internal queue where outgoing events will be buffered until they can be serviced by
+ * the event publisher worker threads. */
+ protected BlockingQueue<ChampEvent> eventQueue;
+
+ /** Number of events that can be queued up for publication before we begin dropping
+ * events. */
+ private Integer eventQueueCapacity;
+
+ /** Number of event publisher worker threads. */
+ private Integer eventStreamPublisherPoolSize;
+
+ private static final Logger logger = LoggerFactory.getInstance().getLogger(AbstractLoggingChampGraph.class);
+
+
+ /**
+ * Create a new instance of the AbstractLoggingChampGraph.
+ *
+ * @param properties - Set of configuration properties for this graph instance.
+ */
+ protected AbstractLoggingChampGraph(Map<String, Object> properties) {
+
+ // Extract the necessary parameters from the configuration properties.
+ configure(properties);
+
+ // Make sure we were passed an event producer as one of our properties, otherwise
+ // there is really nothing more we can do...
+ if(producer == null) {
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "No event stream producer was supplied.");
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "NOTE!! Champ events will NOT be published to the event stream!");
+ return;
+ }
+
+ // Create the blocking queue that we will use to buffer events that we want
+ // published to the event bus.
+ eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
+
+ // Create the executor pool that will do the work of publishing events to the event bus.
+ publisherPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+ new ProducerWorkerThreadFactory());
+
+ try {
+
+ // Start up the producer worker threads.
+ for(int i=0; i<eventStreamPublisherPoolSize; i++) {
+ publisherPool.submit(new EventPublisherWorker());
+ }
+
+ } catch (Exception e) {
+
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "NOTE!! Champ events may NOT be published to the event stream!");
+ return;
+ }
+ }
+
+