[AAI-80 Amsterdam] checking in source code
[aai/champ.git] / src / main / java / org / openecomp / aai / champ / event / AbstractLoggingChampGraph.java
diff --git a/src/main/java/org/openecomp/aai/champ/event/AbstractLoggingChampGraph.java b/src/main/java/org/openecomp/aai/champ/event/AbstractLoggingChampGraph.java
new file mode 100644 (file)
index 0000000..a8e0e2f
--- /dev/null
@@ -0,0 +1,496 @@
+/**
+ * ============LICENSE_START==========================================
+ * org.onap.aai
+ * ===================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017 Amdocs
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END============================================
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.openecomp.aai.champ.event;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.openecomp.aai.champ.ChampCapabilities;
+import org.openecomp.aai.champ.ChampGraph;
+import org.openecomp.aai.champ.event.ChampEvent.ChampOperation;
+import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
+import org.openecomp.aai.champ.exceptions.ChampMarshallingException;
+import org.openecomp.aai.champ.exceptions.ChampObjectNotExistsException;
+import org.openecomp.aai.champ.exceptions.ChampRelationshipNotExistsException;
+import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
+import org.openecomp.aai.champ.exceptions.ChampUnmarshallingException;
+import org.openecomp.aai.champ.model.ChampObject;
+import org.openecomp.aai.champ.model.ChampObjectConstraint;
+import org.openecomp.aai.champ.model.ChampObjectIndex;
+import org.openecomp.aai.champ.model.ChampPartition;
+import org.openecomp.aai.champ.model.ChampRelationship;
+import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
+import org.openecomp.aai.champ.model.ChampRelationshipIndex;
+import org.openecomp.aai.champ.model.ChampSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaPublisher;
+
+
+/**
+ * This class provides the hooks to allow Champ operations to be logged to an event
+ * stream.
+ */
+public abstract class AbstractLoggingChampGraph implements ChampGraph {
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
+  
+  public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
+  public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
+  public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
+  public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
+  public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
+  public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
+  public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
+  public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
+  public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
+  public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
+  public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
+  public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
+  public abstract void                             executeDeletePartition(ChampPartition graph);
+  public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
+  public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
+  public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
+  public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
+  public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
+  public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
+  public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
+  public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
+  public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
+  public abstract ChampSchema                      retrieveSchema();
+  public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
+  public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
+  public abstract void                             deleteSchema();
+  public abstract ChampCapabilities                capabilities();
+
+   
+  /** Configuration property for setting the comma-separated list of servers to use for
+   *  communicating with the event bus. */
+  public final static String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
+  
+  /** Configuration property for setting the number of events that we will try to 'batch' 
+   *  up before sending them to the event bus. */
+  public final static String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
+  public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
+  
+  /** Configuration property for setting the maximum amount of time to wait for a batch of
+   *  outgoing messages to fill up before sending the batch. */
+  public final static String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
+  public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
+  
+  public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
+  public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
+  
+  /** The event stream topic that we will publish Champ events to. */
+  public final static String EVENT_TOPIC = "champRawEvents";
+    
+  /** Number of events to 'batch up' before actually publishing them to the event bus. */
+  private Integer eventStreamBatchSize;
+  
+  private Integer eventStreamBatchTimeout;
+  
+  private Integer eventStreamPublisherPoolSize;
+  
+  /** Comma-separated list of hosts for connecting to the event bus. */
+  private String  eventStreamHosts = null;
+  
+  /** Client used for publishing messages to the event bus. */
+  protected CambriaPublisher producer;
+
+  /** Pool of worker threads that do the work of publishing the events to the event bus. */
+  protected ThreadPoolExecutor publisherPool;
+  
+  
+  /**
+   * Create a new instance of the AbstractLoggingChampGraph.
+   * 
+   * @param properties - Set of configuration properties for this graph instance.
+   */
+  protected AbstractLoggingChampGraph(Map<String, Object> properties) {
+    
+    // Extract the necessary parameters from the configuration properties.
+    configure(properties);
+      
+    // Create the executor pool that will do the work of publishing events to the event bus.
+    publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
+    
+    // Make sure that we are actually provided a list of hosts for connecting to the event
+    // bus before we actually try to do anything useful.
+    if(eventStreamHosts == null) {
+      
+      // We were not supplied a list of event bus hosts, so just bail.
+      logger.error("Cannot initialize event stream publisher without at least one event bus host.");
+      logger.error("NOTE!! Champ events will NOT be published to the event stream!");
+      return;
+    }
+         
+    try {
+        
+      // Instantiate the producer that we will use to publish events to the event stream.
+      setProducer(new CambriaClientBuilders.PublisherBuilder()
+                        .usingHosts(eventStreamHosts)
+                        .onTopic(EVENT_TOPIC)
+                        .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
+                        .build());
+      
+    } catch (MalformedURLException | GeneralSecurityException e) {
+      
+      logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
+      logger.error("NOTE: Champ events will NOT be published to the event stream");
+      producer = null;
+    }
+  }
+
+      
+  /**
+   * Process the configuration properties supplied for this graph instance.
+   * 
+   * @param properties - Configuration parameters.
+   */
+  private void configure(Map<String, Object> properties) {
+    
+    eventStreamBatchSize = 
+        (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
+    eventStreamBatchTimeout = 
+        (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
+    eventStreamPublisherPoolSize = 
+        (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
+    
+    if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
+      eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
+    } 
+  }
+  
+  public void setProducer(CambriaPublisher aProducer) {
+    
+    producer = aProducer;
+  }
+  
+  private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
+    
+    if(properties.containsKey(property)) {
+      return properties.get(property);
+    } else {
+      return defaultValue;
+    }
+  }
+  
+  @Override
+  public void shutdown() {
+    
+    if(publisherPool != null) {
+      publisherPool.shutdown();
+      
+      try {
+        publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {}
+    }
+    
+    if(producer != null) {
+      producer.close();
+    }
+  }
+  
+  public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
+    
+    ChampObject storedObject = executeStoreObject(object);
+    
+    if(storedObject != null) {
+      
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.STORE)
+                    .entity(storedObject)
+                    .build());
+    }
+    
+    return storedObject;
+  }
+  
+  
+  public ChampObject replaceObject(ChampObject object)
+      throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
+    
+    ChampObject replacedObject = executeReplaceObject(object);
+    
+    if(replacedObject != null) {
+      
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.REPLACE)
+                    .entity(replacedObject)
+                    .build());
+    }
+    
+    return replacedObject;
+  }
+  
+
+  public void deleteObject(Object key) throws ChampObjectNotExistsException {
+
+    // Retrieve the object that we are deleting before it's gone, so that we can 
+    // report it to the event stream.
+    Optional<ChampObject> objectToDelete = Optional.empty();
+    try {
+      objectToDelete = retrieveObject(key);
+      
+    } catch (ChampUnmarshallingException e) {
+      logger.error("Unable to generate delete object log: " + e.getMessage());
+    }
+    
+    executeDeleteObject(key);
+    
+    if(objectToDelete.isPresent()) {
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.DELETE)
+                    .entity(objectToDelete.get())
+                    .build());
+    }
+  }
+  
+  
+  public ChampRelationship storeRelationship(ChampRelationship relationship)
+      throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
+
+    ChampRelationship storedRelationship = executeStoreRelationship(relationship);
+    
+    if(storedRelationship != null) {
+      
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.STORE)
+                    .entity(storedRelationship)
+                    .build());
+    }
+    
+    return storedRelationship;
+  }
+  
+  
+  public ChampRelationship replaceRelationship(ChampRelationship relationship)
+      throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
+
+    ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
+    
+    if(replacedRelationship != null) {
+      
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.REPLACE)
+                    .entity(replacedRelationship)
+                    .build());
+    }
+    
+    return replacedRelationship;
+  }
+  
+  
+  public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
+
+    executeDeleteRelationship(relationship);
+    
+    // Update the event stream with the current operation.
+    logEvent(ChampEvent.builder()
+                  .operation(ChampOperation.DELETE)
+                  .entity(relationship)
+                  .build());
+  }
+  
+  
+  public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
+
+    ChampPartition storedPartition = executeStorePartition(partition);
+    
+    if(storedPartition != null) {
+      
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.STORE)
+                    .entity(storedPartition)
+                    .build());
+    }
+    
+    return storedPartition;
+  }
+  
+  
+  public void deletePartition(ChampPartition graph) {
+
+    executeDeletePartition(graph);
+    
+    // Update the event stream with the current operation.
+    logEvent(ChampEvent.builder()
+                  .operation(ChampOperation.DELETE)
+                  .entity(graph)
+                  .build());
+  }
+  
+  
+  public void storeObjectIndex(ChampObjectIndex index) {
+
+    executeStoreObjectIndex(index);
+    
+    // Update the event stream with the current operation.
+    logEvent(ChampEvent.builder()
+                  .operation(ChampOperation.STORE)
+                  .entity(index)
+                  .build());
+  }
+  
+  
+  public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
+    
+    // Retrieve the index that we are deleting before it's gone, so that we can 
+    // report it to the event stream.
+    Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
+    
+    executeDeleteObjectIndex(indexName);
+    
+    if(indexToDelete.isPresent()) {
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.DELETE)
+                    .entity(indexToDelete.get()) 
+                    .build());
+    }
+  }
+  
+  
+  public void storeRelationshipIndex(ChampRelationshipIndex index) {
+
+    executeStoreRelationshipIndex(index);
+    
+    // Update the event stream with the current operation.
+    logEvent(ChampEvent.builder()
+                  .operation(ChampOperation.STORE)
+                  .entity(index) 
+                  .build());
+  }
+  
+  
+  public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
+
+    // Retrieve the index that we are deleting before it's gone, so that we can 
+    // report it to the event stream.
+    Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
+    
+    executeDeleteRelationshipIndex(indexName);
+    
+    if(indexToDelete.isPresent()) {
+      // Update the event stream with the current operation.
+      logEvent(ChampEvent.builder()
+                    .operation(ChampOperation.DELETE)
+                    .entity(indexToDelete.get()) 
+                    .build());
+    }
+  }
+  
+  
+  /**
+   * Submits an event to be published to the event stream.
+   * 
+   * @param anEvent - The event to be published.
+   */
+  public void logEvent(ChampEvent anEvent) {
+    
+    if(logger.isDebugEnabled()) {
+      logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
+    }
+    
+    try {
+      
+      // Try submitting the event to be published to the event bus.
+      publisherPool.execute(new EventPublisher(anEvent));
+    
+    } catch (RejectedExecutionException re) {
+      logger.error("Event could not be published to the event bus due to: " + re.getMessage());
+      
+    } catch (NullPointerException npe) {
+      logger.error("Can not publish null event to event bus.");
+    }
+  }
+  
+  
+  /**
+   * This class runs as a background thread and is responsible for pulling Champ events off
+   * of the internal queue and publishing them to the event stream.
+   */
+  private class EventPublisher implements Runnable {
+    
+    /** Partition key to use when publishing events to the event stream.  We WANT all events
+     *  to go to a single partition, so we are just using a hard-coded key for every event. */
+    private static final String EVENTS_PARTITION_KEY = "champEventKey";
+    
+    private ChampEvent event;
+    
+    public EventPublisher(ChampEvent event) {
+      this.event = event;
+    }
+    
+    
+    @Override
+    public void run() {
+
+      boolean done = false;
+      while(!done && !Thread.currentThread().isInterrupted()) {
+        try {
+          
+          // Make sure that we actually have a producer instance to use to publish
+          // the event with.
+          if(producer != null) {
+            
+            // Try publishing the event to the event bus.
+            producer.send(EVENTS_PARTITION_KEY, event.toJson());
+            
+          } else if (logger.isDebugEnabled()) {            
+            logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
+          }
+          
+          done = true;
+          
+        } catch (IOException e) {
+  
+          // We were unable to publish to the event bus, so wait a bit and then try
+          // again.
+          try {
+            Thread.sleep(500);
+            
+          } catch (InterruptedException e1) {
+            logger.info("Stopping event publisher worker thread.");
+            return;
+          }
+        }           
+      }
+    }
+  }
+}