Merge "Updated champ-lib to use the correct logger"
[aai/champ.git] / champ-lib / champ-core / src / main / java / org / onap / aai / champcore / event / AbstractLoggingChampGraph.java
index d920dbc..1ecf71b 100644 (file)
@@ -32,12 +32,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
+
 import org.onap.aai.champcore.ChampCapabilities;
+import org.onap.aai.champcore.ChampCoreMsgs;
 import org.onap.aai.champcore.ChampGraph;
 import org.onap.aai.champcore.ChampTransaction;
 import org.onap.aai.champcore.event.ChampEvent.ChampOperation;
 import org.onap.aai.champcore.event.envelope.ChampEventEnvelope;
-import org.onap.aai.champcore.event.envelope.ChampEventHeader;
 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
@@ -53,9 +54,10 @@ import org.onap.aai.champcore.model.ChampRelationship;
 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
 import org.onap.aai.champcore.model.ChampRelationshipIndex;
 import org.onap.aai.champcore.model.ChampSchema;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.event.api.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 
 
 
@@ -65,7 +67,81 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractLoggingChampGraph implements ChampGraph {
 
-  private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
+  public static final String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
+  public static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+  public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
+  public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
+  public static final String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
+  protected static final String KEY_PROPERTY_NAME = "aai-uuid";
+  protected static final String NODE_TYPE_PROPERTY_NAME = "aai-node-type";
+
+  /** Pool of worker threads that do the work of publishing the events to the event bus. */
+  protected ThreadPoolExecutor publisherPool;
+
+  /** Client used for publishing events to the event bus. */
+  protected EventPublisher producer;
+
+  /** Internal queue where outgoing events will be buffered until they can be serviced by
+   *  the event publisher worker threads. */
+  protected BlockingQueue<ChampEvent> eventQueue;
+
+  /** Number of events that can be queued up for publication before we begin dropping
+   *  events. */
+  private Integer eventQueueCapacity;
+
+  /** Number of event publisher worker threads. */
+  private Integer eventStreamPublisherPoolSize;
+
+  private static final Logger logger = LoggerFactory.getInstance().getLogger(AbstractLoggingChampGraph.class);
+
+
+  /**
+   * Create a new instance of the AbstractLoggingChampGraph.
+   *
+   * @param properties - Set of configuration properties for this graph instance.
+   */
+  protected AbstractLoggingChampGraph(Map<String, Object> properties) {
+
+    // Extract the necessary parameters from the configuration properties.
+    configure(properties);
+
+    // Make sure we were passed an event producer as one of our properties, otherwise
+    // there is really nothing more we can do...
+    if(producer == null) {
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+          "No event stream producer was supplied.");
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+          "NOTE!! Champ events will NOT be published to the event stream!");
+      return;
+    }
+
+    // Create the blocking queue that we will use to buffer events that we want
+    // published to the event bus.
+    eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
+
+    // Create the executor pool that will do the work of publishing events to the event bus.
+    publisherPool =
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
+            new ProducerWorkerThreadFactory());
+
+    try {
+
+      // Start up the producer worker threads.
+      for(int i=0; i<eventStreamPublisherPoolSize; i++) {
+        publisherPool.submit(new EventPublisherWorker());
+      }
+
+    } catch (Exception e) {
+
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+          "Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR,
+          "NOTE!! Champ events may NOT be published to the event stream!");
+      return;
+    }
+  }
+
+
 
   public abstract Optional<ChampObject>       retrieveObject(Object key) throws ChampUnmarshallingException, ChampTransactionException;
   public abstract Optional<ChampObject>       retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampTransactionException;
@@ -78,8 +154,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
   public abstract Stream<ChampRelationship>   retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException, ChampObjectNotExistsException, ChampTransactionException;
   public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams) throws ChampTransactionException;
-  public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
 
+  public abstract Stream<ChampRelationship>   queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) throws ChampTransactionException;
 
   /**
     * Creates or updates a vertex in the graph data store.
@@ -259,7 +335,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
    * @param index       - The object index to be created/updated.
    */
   public abstract void executeStoreObjectIndex(ChampObjectIndex index);
-
   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
@@ -275,35 +350,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   public abstract ChampCapabilities                capabilities();
 
 
-
-  public final static String  PARAM_EVENT_QUEUE_CAPACITY = "champcore.event.stream.buffer.capacity";
-  public final static Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
-
-  public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champcore.event.stream.publisher-pool-size";
-  public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 5;
-
-  public final static String PARAM_EVENT_STREAM_PRODUCER = "champcore.event.stream.publisher";
-
-
-
-  /** Number of events that can be queued up for publication before we begin dropping
-   *  events. */
-  private Integer eventQueueCapacity;
-
-  /** Number of event publisher worker threads. */
-  private Integer eventStreamPublisherPoolSize;
-
-  /** Pool of worker threads that do the work of publishing the events to the event bus. */
-  protected ThreadPoolExecutor publisherPool;
-
-  /** Client used for publishing events to the event bus. */
-  protected EventPublisher producer;
-
-  /** Internal queue where outgoing events will be buffered until they can be serviced by
-   *  the event publisher worker threads. */
-  protected BlockingQueue<ChampEvent> eventQueue;
-
-
   /**
    * Thread factory for the event producer workers.
    */
@@ -317,49 +363,6 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
   }
 
 
-  /**
-   * Create a new instance of the AbstractLoggingChampGraph.
-   *
-   * @param properties - Set of configuration properties for this graph instance.
-   */
-  protected AbstractLoggingChampGraph(Map<String, Object> properties) {
-
-    // Extract the necessary parameters from the configuration properties.
-    configure(properties);
-
-    // Make sure we were passed an event producer as one of our properties, otherwise
-    // there is really nothing more we can do...
-    if(producer == null) {
-      logger.error("No event stream producer was supplied.");
-      logger.error("NOTE!! Champ events will NOT be published to the event stream!");
-      return;
-    }
-
-    // Create the blocking queue that we will use to buffer events that we want
-    // published to the event bus.
-    eventQueue = new ArrayBlockingQueue<ChampEvent>(eventQueueCapacity);
-
-    // Create the executor pool that will do the work of publishing events to the event bus.
-    publisherPool =
-        (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize,
-                                                          new ProducerWorkerThreadFactory());
-
-    try {
-
-      // Start up the producer worker threads.
-      for(int i=0; i<eventStreamPublisherPoolSize; i++) {
-         publisherPool.submit(new EventPublisherWorker());
-      }
-
-    } catch (Exception e) {
-
-      logger.error("Failed to instantiate event stream producer thread due to: '" + e.getMessage() + "'");
-      logger.error("NOTE!! Champ events may NOT be published to the event stream!");
-      return;
-    }
-  }
-
-
   /**
    * Process the configuration properties supplied for this graph instance.
    *
@@ -399,7 +402,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       try {
         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
-        logger.warn("Termination interrupted");
+        logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN, 
+            "Termination interrupted");
         Thread.currentThread().interrupt();
       }
     }
@@ -410,7 +414,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
         producer.close();
 
       } catch (Exception e) {
-        logger.error("Failed to stop event stream producer: " + e.getMessage());
+        logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
+            "Failed to stop event stream producer: " + e.getMessage());
       }
     }
   }
@@ -425,7 +430,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
 
     } catch (ChampTransactionException e) {
 
-      logger.warn("Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
+      logger.warn(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_WARN, 
+          "Events associated with transaction " + transaction.id() + " not generated due to transaction commit failure.");
 
       List<ChampEvent> enqueuedEvents = transaction.getEnqueuedEvents();
       for(ChampEvent event : enqueuedEvents) {
@@ -513,7 +519,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       objectToDelete = retrieveObject(key, transaction);
 
     } catch (ChampUnmarshallingException e) {
-      logger.error("Unable to generate delete object log: " + e.getMessage());
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
+          "Unable to generate delete object log: " + e.getMessage());
     }
 
     executeDeleteObject(key, transaction);
@@ -735,14 +742,16 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
       return;
     }
 
-    logger.info("Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
+    logger.info(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_INFO, 
+        "Log champcore event with transaction id: " + anEvent.getTransactionId() + " to event bus");
     if(logger.isDebugEnabled()) {
       logger.debug("Event payload: " + anEvent.toString());
     }
 
     // Try to submit the event to be published to the event bus.
     if(!eventQueue.offer(anEvent)) {
-      logger.error("Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
+      logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
+          "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
     }
   }
 
@@ -787,7 +796,8 @@ public abstract class AbstractLoggingChampGraph implements ChampGraph {
 
         } catch (Exception e) {
 
-          logger.error("Failed to publish event to event bus: " + e.getMessage());
+          logger.error(ChampCoreMsgs.CHAMPCORE_ABSTRACT_LOGGING_CHAMP_GRAPH_ERROR, 
+              "Failed to publish event to event bus: " + e.getMessage());
         }
       }
     }