import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
+
import org.onap.aai.champcore.ChampCapabilities;
+import org.onap.aai.champcore.ChampCoreMsgs;
import org.onap.aai.champcore.ChampGraph;
import org.onap.aai.champcore.ChampTransaction;
import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
-import org.onap.aai.champcore.event.envelope.ChampEventHeader;
import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
import org.onap.aai.champcore.exceptions.ChampMarshallingException;
import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
import org.onap.aai.champcore.model.ChampRelationshipConstraint;
import org.onap.aai.champcore.model.ChampRelationshipIndex;
import org.onap.aai.champcore.model.ChampSchema;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.event.api.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
*/
public abstract class AbstractLoggingChampGraph implements ChampGraph {
- private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
+ public static final String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
+ public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+ public static final String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
+ public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
+ public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+ protected static final String KEY_PROPERTY_NAME = "aai-uuid";
+ protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
+
+ /** Pool of worker threads that do the work of publishing the events to the event bus. */
+ protected ThreadPoolExecutor publisherPool;
+
+ /** Client used for publishing events to the event bus. */
+ protected EventPublisher producer;
+
+ /** Internal queue where outgoing events will be buffered until they can be serviced by
+ * the event publisher worker threads. */
+ protected BlockingQueue<ChampEvent> eventQueue;
+
+ /** Number of events that can be queued up for publication before we begin dropping
+ * events. */
+ private Integer eventQueueCapacity;
+
+ /** Number of event publisher worker threads. */
+ private Integer eventStreamPublisherPoolSize;
+
+ private static final Logger logger = LoggerFactory.getInstance().getLogger(AbstractLoggingChampGraph.class);
+
+
+ /**
+ * Create a new instance of the AbstractLoggingChampGraph.
+ *
+ * @param properties - Set of configuration properties for this graph instance.
+ */
+ protected AbstractLoggingChampGraph(Map<String, Object> properties) {
+
+ // Extract the necessary parameters from the configuration properties.
+ configure(properties);
+
+ // Make sure we were passed an event producer as one of our properties, otherwise
+ // there is really nothing more we can do...
+ if(producer == null) {
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "No event stream producer was supplied.");
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "NOTE!! Champ events will NOT be published to the event stream!");
+ return;
+ }
+
+ // Create the blocking queue that we will use to buffer events that we want
+ // published to the event bus.
+ eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
+
+ // Create the executor pool that will do the work of publishing events to the event bus.
+ publisherPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+ new ProducerWorkerThreadFactory());
+
+ try {
+
+ // Start up the producer worker threads.
+ for(int i=0; i<eventStreamPublisherPoolSize; i++) {
+ publisherPool.submit(new EventPublisherWorker());
+ }
+
+ } catch (Exception e) {
+
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "NOTE!! Champ events may NOT be published to the event stream!");
+ return;
+ }
+ }
+
+
public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
public abstract Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
- public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
+ public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
/**
* Creates or updates a vertex in the graph data store.
* @param index - The object index to be created/updated.
*/
public abstract void executeStoreObjectIndex(ChampObjectIndex index);
-
public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
public abstract ChampCapabilities capabilities();
-
- public final static String PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
- public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
-
- public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
- public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
-
- public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
-
-
-
- /** Number of events that can be queued up for publication before we begin dropping
- * events. */
- private Integer eventQueueCapacity;
-
- /** Number of event publisher worker threads. */
- private Integer eventStreamPublisherPoolSize;
-
- /** Pool of worker threads that do the work of publishing the events to the event bus. */
- protected ThreadPoolExecutor publisherPool;
-
- /** Client used for publishing events to the event bus. */
- protected EventPublisher producer;
-
- /** Internal queue where outgoing events will be buffered until they can be serviced by
- * the event publisher worker threads. */
- protected BlockingQueue<ChampEvent> eventQueue;
-
-
/**
* Thread factory for the event producer workers.
*/
}
- /**
- * Create a new instance of the AbstractLoggingChampGraph.
- *
- * @param properties - Set of configuration properties for this graph instance.
- */
- protected AbstractLoggingChampGraph(Map<String, Object> properties) {
-
- // Extract the necessary parameters from the configuration properties.
- configure(properties);
-
- // Make sure we were passed an event producer as one of our properties, otherwise
- // there is really nothing more we can do...
- if(producer == null) {
- logger.error("No event stream producer was supplied.");
- logger.error("NOTE!! Champ events will NOT be published to the event stream!");
- return;
- }
-
- // Create the blocking queue that we will use to buffer events that we want
- // published to the event bus.
- eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
-
- // Create the executor pool that will do the work of publishing events to the event bus.
- publisherPool =
- (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
- new ProducerWorkerThreadFactory());
-
- try {
-
- // Start up the producer worker threads.
- for(int i=0; i<eventStreamPublisherPoolSize; i++) {
- publisherPool.submit(new EventPublisherWorker());
- }
-
- } catch (Exception e) {
-
- logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
- logger.error("NOTE!! Champ events may NOT be published to the event stream!");
- return;
- }
- }
-
-
/**
* Process the configuration properties supplied for this graph instance.
*
try {
publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN,
+ "Termination interrupted");
+ Thread.currentThread().interrupt();
+ }
}
if(producer != null) {
producer.close();
} catch (Exception e) {
- logger.error("Failed to stop event stream producer: " + e.getMessage());
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Failed to stop event stream producer: " + e.getMessage());
}
}
}
} catch (ChampTransactionException e) {
- logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
+ logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN,
+ "Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
for(ChampEvent event : enqueuedEvents) {
objectToDelete = retrieveObject(key, transaction);
} catch (ChampUnmarshallingException e) {
- logger.error("Unable to generate delete object log: " + e.getMessage());
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Unable to generate delete object log: " + e.getMessage());
}
executeDeleteObject(key, transaction);
return;
}
- logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
+ logger.info(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_INFO,
+ "Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
if(logger.isDebugEnabled()) {
logger.debug("Event payload: " + anEvent.toString());
}
// Try to submit the event to be published to the event bus.
if(!eventQueue.offer(anEvent)) {
- logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
}
}
} catch (Exception e) {
- logger.error("Failed to publish event to event bus: " + e.getMessage());
+ logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+ "Failed to publish event to event bus: " + e.getMessage());
}
}
}