Port champ-microservice project restructure
[aai/champ.git] / champ-lib / champ-core / src / test / java / org / onap / aai / champcore / event / AbstractLoggingChampGraphTest.java
  * ============LICENSE_END============================================
  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  */
-package org.onap.aai.champ.event;
+package org.onap.aai.champcore.event;
 
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -37,25 +34,25 @@ import java.util.stream.Stream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.aai.champ.ChampCapabilities;
-import org.onap.aai.champ.event.AbstractLoggingChampGraph;
-import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
-import org.onap.aai.champ.exceptions.ChampMarshallingException;
-import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
-import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
-import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
-import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
-import org.onap.aai.champ.model.ChampObject;
-import org.onap.aai.champ.model.ChampObjectConstraint;
-import org.onap.aai.champ.model.ChampObjectIndex;
-import org.onap.aai.champ.model.ChampPartition;
-import org.onap.aai.champ.model.ChampRelationship;
-import org.onap.aai.champ.model.ChampRelationshipConstraint;
-import org.onap.aai.champ.model.ChampRelationshipIndex;
-import org.onap.aai.champ.model.ChampSchema;
-import org.slf4j.Logger;
+import org.onap.aai.champcore.ChampCapabilities;
+import org.onap.aai.champcore.ChampTransaction;
+import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampMarshallingException;
+import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
+import org.onap.aai.champcore.exceptions.ChampTransactionException;
+import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.aai.champcore.model.ChampObjectConstraint;
+import org.onap.aai.champcore.model.ChampObjectIndex;
+import org.onap.aai.champcore.model.ChampPartition;
+import org.onap.aai.champcore.model.ChampRelationship;
+import org.onap.aai.champcore.model.ChampRelationshipConstraint;
+import org.onap.aai.champcore.model.ChampRelationshipIndex;
+import org.onap.aai.champcore.model.ChampSchema;
 
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.onap.aai.event.api.EventPublisher;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 
@@ -80,17 +77,20 @@ public class AbstractLoggingChampGraphTest {
     
     // Instantiate an 'in-memory' graph for test purposes.
     Map<String, Object> graphProperties = new HashMap<String, Object>();
-    graphProperties.put("champ.event.stream.hosts", "myeventstreamhost");
-    graphProperties.put("champ.event.stream.batch-size", 1);
-    testGraph = new TestGraph(graphProperties, producer);
+    graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
+    graphProperties.put("champcore.event.stream.batch-size", 1);
+    graphProperties.put("champcore.event.stream.publisher", producer);
+    testGraph = new TestGraph(graphProperties);
   }
   
   
   /**
    * Perform any cleanup that needs to be done after each test.
+   * 
+   * @throws Exception 
    */
   @After
-  public void tearDown() {
+  public void tearDown() throws Exception {
     
     // Close our stubbed producer and graph.
     producer.close();
@@ -109,15 +109,69 @@ public class AbstractLoggingChampGraphTest {
    * @throws JsonParseException
    * @throws JsonMappingException
    * @throws IOException
+   * @throws ChampTransactionException 
    */
   @Test
-  public void vertexOperationsTest() throws ChampMarshallingException, 
-                                            ChampSchemaViolationException, 
-                                            ChampObjectNotExistsException, 
-                                            InterruptedException, 
-                                            JsonParseException, 
-                                            JsonMappingException, 
-                                            IOException {
+  public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException, 
+                                                             ChampSchemaViolationException, 
+                                                             ChampObjectNotExistsException, 
+                                                             InterruptedException, 
+                                                             JsonParseException, 
+                                                             JsonMappingException, 
+                                            IOException, 
+                                            ChampTransactionException {
+            
+    // Create a vertex and store it in the graph data store.
+    ChampObject obj1 = ChampObject.create()
+        .ofType("foo")
+        .withKey("123")
+        .withProperty("p1", "v1")
+        .withProperty("p2", "v2")
+        .build();  
+    testGraph.storeObject(obj1, Optional.empty());
+    
+    // Retrieve the next event from the event stream and validate that it is what we expect.
+    String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+    assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
+    assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+  
+    
+    
+    // Create a new vertex based on the one that we already created.
+    ChampObject obj2 = ChampObject.create()
+        .from(obj1)
+        .withKey("123")
+        .withProperty("p3", "v3")
+        .build();
+    
+    // Now, try doing a replace operation.
+    testGraph.replaceObject(obj2, Optional.empty());
+    
+    
+    
+    // Retrieve the next event from the event stream and validate that it is what we expect.
+    loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+    assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
+    assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+    
+    // Finally, delete the vertex.
+    testGraph.deleteObject("123", Optional.empty());
+    
+    // Retrieve the next event from the event stream and validate that it is what we expect.
+    loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+    assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
+    assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+  }
+  
+  @Test
+  public void vertexOperationsLegacyTest2() throws ChampMarshallingException, 
+                                                   ChampSchemaViolationException, 
+                                                   ChampObjectNotExistsException, 
+                                                   InterruptedException, 
+                                                   JsonParseException, 
+                                                   JsonMappingException, 
+                                                   IOException, 
+                                                   ChampTransactionException {
             
     // Create a vertex and store it in the graph data store.
     ChampObject obj1 = ChampObject.create()
@@ -127,12 +181,14 @@ public class AbstractLoggingChampGraphTest {
         .withProperty("p2", "v2")
         .build();  
     testGraph.storeObject(obj1);
-
+    
     // Retrieve the next event from the event stream and validate that it is what we expect.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
   
+    
+    
     // Create a new vertex based on the one that we already created.
     ChampObject obj2 = ChampObject.create()
         .from(obj1)
@@ -143,6 +199,8 @@ public class AbstractLoggingChampGraphTest {
     // Now, try doing a replace operation.
     testGraph.replaceObject(obj2);
     
+    
+    
     // Retrieve the next event from the event stream and validate that it is what we expect.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
@@ -157,7 +215,6 @@ public class AbstractLoggingChampGraphTest {
     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
   }
   
-  
   /**
    * This test validates that performing vertex operations in the case where the data to be
    * forwarded to the event stream is unavailable results in no event being generated, but
@@ -170,6 +227,7 @@ public class AbstractLoggingChampGraphTest {
    * @throws JsonParseException
    * @throws JsonMappingException
    * @throws IOException
+   * @throws ChampTransactionException 
    */
   @Test
   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
@@ -178,7 +236,7 @@ public class AbstractLoggingChampGraphTest {
                                                      InterruptedException, 
                                                      JsonParseException, 
                                                      JsonMappingException, 
-                                                     IOException {
+                                                     IOException, ChampTransactionException {
             
     // Setup our test graph to simulate failures to retrieve data from the graph data store.
     testGraph.returnNulls();
@@ -190,7 +248,7 @@ public class AbstractLoggingChampGraphTest {
         .withProperty("p1", "v1")
         .withProperty("p2", "v2")
         .build();  
-    testGraph.storeObject(obj1);
+    testGraph.storeObject(obj1, Optional.empty());
 
     // Check our simulated event stream to verify that an event log was produced.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -206,7 +264,7 @@ public class AbstractLoggingChampGraphTest {
         .build();
     
     // Now, try doing a replace operation.
-    testGraph.replaceObject(obj2);
+    testGraph.replaceObject(obj2, Optional.empty());
     
     // Check our simulated event stream to see if an event log was not produced.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -215,7 +273,7 @@ public class AbstractLoggingChampGraphTest {
     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
     
     // Finally, delete the vertex.
-    testGraph.deleteObject("123");
+    testGraph.deleteObject("123", Optional.empty());
     
     // Check our simulated event stream to see if an event log was not produced.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -238,6 +296,7 @@ public class AbstractLoggingChampGraphTest {
    * @throws IOException
    * @throws ChampUnmarshallingException
    * @throws ChampRelationshipNotExistsException
+   * @throws ChampTransactionException 
    */
   @Test
   public void edgeOperationsTest() throws ChampMarshallingException, 
@@ -248,7 +307,7 @@ public class AbstractLoggingChampGraphTest {
                                           JsonMappingException, 
                                           IOException, 
                                           ChampUnmarshallingException, 
-                                          ChampRelationshipNotExistsException {
+                                          ChampRelationshipNotExistsException, ChampTransactionException {
     
     // Create two vertices to act as the end points of our edge.
     ChampObject obj1 = ChampObject.create()
@@ -269,7 +328,7 @@ public class AbstractLoggingChampGraphTest {
         .property("property-1", "value-1")
         .property("property-2", "value-2")
         .build();
-    testGraph.storeRelationship(rel);
+    testGraph.storeRelationship(rel, Optional.empty());
     
     // Retrieve the next event from the event stream and validate that it is what we expect.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -283,7 +342,7 @@ public class AbstractLoggingChampGraphTest {
         .withKey("123")
         .withProperty("property-3", "value-3")
         .build();
-    testGraph.replaceRelationship(rel2);
+    testGraph.replaceRelationship(rel2, Optional.empty());
 
     // Retrieve the next event from the event stream and validate that it is what we expect.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -291,7 +350,7 @@ public class AbstractLoggingChampGraphTest {
     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
     
     // Finally, delete our edge.
-    testGraph.deleteRelationship(rel2);
+    testGraph.deleteRelationship(rel2, Optional.empty());
     
     // Retrieve the next event from the event stream and validate that it is what we expect.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -314,6 +373,7 @@ public class AbstractLoggingChampGraphTest {
    * @throws IOException
    * @throws ChampUnmarshallingException
    * @throws ChampRelationshipNotExistsException
+   * @throws ChampTransactionException 
    */
   @Test
   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
@@ -324,7 +384,7 @@ public class AbstractLoggingChampGraphTest {
                                                    JsonMappingException, 
                                                    IOException, 
                                                    ChampUnmarshallingException, 
-                                                   ChampRelationshipNotExistsException {
+                                                   ChampRelationshipNotExistsException, ChampTransactionException {
     
     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
     // events.
@@ -349,7 +409,7 @@ public class AbstractLoggingChampGraphTest {
         .property("property-1", "value-1")
         .property("property-2", "value-2")
         .build();
-    testGraph.storeRelationship(rel);
+    testGraph.storeRelationship(rel, Optional.empty());
     
     // Check our simulated event stream to see if an event log was produced.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -364,7 +424,7 @@ public class AbstractLoggingChampGraphTest {
         .withKey("123")
         .withProperty("property-3", "value-3")
         .build();
-    testGraph.replaceRelationship(rel2);
+    testGraph.replaceRelationship(rel2, Optional.empty());
     
     // Check our simulated event stream to see if an event log was produced.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -387,6 +447,7 @@ public class AbstractLoggingChampGraphTest {
    * @throws IOException
    * @throws ChampUnmarshallingException
    * @throws ChampRelationshipNotExistsException
+   * @throws ChampTransactionException 
    */
   @Test
   public void partitionOperationsTest() throws ChampMarshallingException, 
@@ -397,7 +458,7 @@ public class AbstractLoggingChampGraphTest {
                                                JsonMappingException, 
                                                IOException, 
                                                ChampUnmarshallingException, 
-                                               ChampRelationshipNotExistsException {
+                                               ChampRelationshipNotExistsException, ChampTransactionException {
     
     // Create the vertices and edge objects that we need to create a partition.
     ChampObject obj1 = ChampObject.create()
@@ -425,7 +486,7 @@ public class AbstractLoggingChampGraphTest {
         .withObject(obj2)
         .withRelationship(rel)
         .build();
-    testGraph.storePartition(partition);
+    testGraph.storePartition(partition, Optional.empty());
     
     // Retrieve the next event from the event stream and validate that it is what we expect.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -433,7 +494,7 @@ public class AbstractLoggingChampGraphTest {
     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
 
     // Now, delete our partition.
-    testGraph.deletePartition(partition);
+    testGraph.deletePartition(partition, Optional.empty());
     
     // Retrieve the next event from the event stream and validate that it is what we expect.
     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -456,6 +517,7 @@ public class AbstractLoggingChampGraphTest {
    * @throws IOException
    * @throws ChampUnmarshallingException
    * @throws ChampRelationshipNotExistsException
+   * @throws ChampTransactionException 
    */
   @Test
   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
@@ -466,7 +528,7 @@ public class AbstractLoggingChampGraphTest {
                                           JsonMappingException, 
                                           IOException, 
                                           ChampUnmarshallingException, 
-                                          ChampRelationshipNotExistsException {
+                                          ChampRelationshipNotExistsException, ChampTransactionException {
     
     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
     // events.
@@ -497,7 +559,7 @@ public class AbstractLoggingChampGraphTest {
         .withObject(obj2)
         .withRelationship(rel)
         .build();
-    testGraph.storePartition(partition);
+    testGraph.storePartition(partition, Optional.empty());
     
     // Check our simulated event stream to see if an an event log was produced.
     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
@@ -727,10 +789,8 @@ public class AbstractLoggingChampGraphTest {
     private boolean returnNulls = false;
     
     
-    protected TestGraph(Map<String, Object> properties, CambriaPublisher producer) {
-      super(properties);
-      
-      setProducer(producer);
+    protected TestGraph(Map<String, Object> properties) {
+      super(properties);      
     }
 
     public void returnNulls() {
@@ -746,9 +806,11 @@ public class AbstractLoggingChampGraphTest {
     }
     
     @Override
-    public ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException,
-                                                                     ChampSchemaViolationException, 
-                                                                     ChampObjectNotExistsException {
+    public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) 
+        throws ChampMarshallingException,
+               ChampSchemaViolationException, 
+               ChampObjectNotExistsException {
+      
       if(!returnNulls) {
         return object;
       } else {
@@ -757,9 +819,11 @@ public class AbstractLoggingChampGraphTest {
     }
 
     @Override
-    public ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException,
-                                                                       ChampSchemaViolationException, 
-                                                                       ChampObjectNotExistsException {
+    public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) 
+        throws ChampMarshallingException,
+               ChampSchemaViolationException, 
+               ChampObjectNotExistsException {
+      
       if(!returnNulls) {
         return object;
       } else {
@@ -769,6 +833,11 @@ public class AbstractLoggingChampGraphTest {
 
     @Override
     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
+      return retrieveObject(key, Optional.empty());
+    }
+    
+    @Override
+    public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
       
       if(!returnNulls) {
         return(Optional.of(ChampObject.create()
@@ -781,18 +850,24 @@ public class AbstractLoggingChampGraphTest {
     }
 
     @Override
-    public void executeDeleteObject(Object key) throws ChampObjectNotExistsException {
+    public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
    
     }
 
     @Override
     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
+      return queryObjects(queryParams, Optional.empty());
+    }
+
+   
+    @Override
+    public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
       // Not used by any tests.
       return null;
     }
 
     @Override
-    public ChampRelationship executeStoreRelationship(ChampRelationship relationship) 
+    public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction
         throws ChampUnmarshallingException, 
                ChampMarshallingException, 
                ChampObjectNotExistsException, 
@@ -807,7 +882,7 @@ public class AbstractLoggingChampGraphTest {
     }
 
     @Override
-    public ChampRelationship executeReplaceRelationship(ChampRelationship relationship)
+    public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
         throws ChampUnmarshallingException, 
                ChampMarshallingException,
                ChampSchemaViolationException, 
@@ -822,18 +897,29 @@ public class AbstractLoggingChampGraphTest {
 
     @Override
     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
+      return retrieveRelationship(key, Optional.empty());
+    }
+    
+    @Override
+    public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
       // Not used by any tests.
       return null;
     }
 
     @Override
-    public void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
+    public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
       // Not used by any tests.   
     }
 
     @Override
     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
         throws ChampUnmarshallingException, ChampObjectNotExistsException {
+      return retrieveRelationships(object, Optional.empty());
+    }
+    
+    @Override
+    public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
+        throws ChampUnmarshallingException, ChampObjectNotExistsException {
       
       // Not used by any tests.
       return null;
@@ -841,13 +927,18 @@ public class AbstractLoggingChampGraphTest {
 
     @Override
     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
+      return queryRelationships(queryParams, Optional.empty());
+    }
+    
+    @Override
+    public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
       
       // Not used by any tests.
       return null;
     }
 
     @Override
-    public ChampPartition executeStorePartition(ChampPartition partition) 
+    public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction
         throws ChampSchemaViolationException, 
                ChampRelationshipNotExistsException,
                ChampMarshallingException, 
@@ -861,7 +952,7 @@ public class AbstractLoggingChampGraphTest {
     }
 
     @Override
-    public void executeDeletePartition(ChampPartition graph) {
+    public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
       // Not used by any tests.     
     }
 
@@ -958,73 +1049,133 @@ public class AbstractLoggingChampGraphTest {
       // Not used by any tests.
       return null;
     }
+
+    @Override
+    public ChampTransaction openTransaction() {
+      // Not used by any tests.
+      return null;
+    }
+
+    @Override
+    public void commitTransaction(ChampTransaction transaction) {
+      // Not used by any tests.
+      
+    }
+
+    @Override
+    public void rollbackTransaction(ChampTransaction transaction) {
+      // Not used by any tests.    
+    }
   }
   
-  private class InMemoryPublisher implements CambriaPublisher {
+  private class InMemoryPublisher implements EventPublisher {
 
     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
-    
     private boolean failMode=false;
     
+    
     public void enterFailMode() {
       failMode=true;
     }
     
     @Override
-    public void logTo(Logger log) {
-      // Not used by any tests. 
-    }
-
-    @Override
-    public void setApiCredentials(String apiKey, String apiSecret) {
-      // Not used by any tests.  
+    public int sendSync(String partitionKey, String message) throws Exception {
+      
+      if(!failMode) {
+        eventStream.add(message);
+        return 0;
+      } else {
+        failedMsgs.add(message);
+        throw new IOException("nope");
+      }
     }
-
+    
     @Override
-    public void clearApiCredentials() {
-      // Not used by any tests.  
+    public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
+      
+      for(String msg : messages) {
+        if(!failMode) {
+          eventStream.add(msg);
+          return 0;
+        } else {
+          failedMsgs.add(msg);
+          throw new IOException("nope");
+        }
+      }
+      return 0;
     }
-
+    
     @Override
-    public void setHttpBasicCredentials(String username, String password) {
-      // Not used by any tests.  
+    public int sendSync(String message) throws Exception {
+      if(!failMode) {
+        eventStream.add(message);
+        return 0;
+      } else {
+        failedMsgs.add(message);
+        throw new IOException("nope");
+      }
     }
-
+    
     @Override
-    public void clearHttpBasicCredentials() {
-      // Not used by any tests.  
+    public int sendSync(Collection<String> messages) throws Exception {
+      
+      for(String msg : messages) {
+        if(!failMode) {
+          eventStream.add(msg);
+          return 0;
+        } else {
+          failedMsgs.add(msg);
+          throw new IOException("nope");
+        }
+      }
+      return 0;
     }
-
     @Override
-    public int send(String partition, String msg) throws IOException {
-      
+    public void sendAsync(String partitionKey, String message) throws Exception {
       if(!failMode) {
-        eventStream.add(msg);
-        return 0;
+        eventStream.add(message);
       } else {
-        failedMsgs.add(msg);
+        failedMsgs.add(message);
         throw new IOException("nope");
-      }
+      }      
     }
-
     @Override
-    public int send(message msg) throws IOException {
-      eventStream.add(msg.toString());
-      return 0;
+    public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
+      for(String msg : messages) {
+        if(!failMode) {
+          eventStream.add(msg);
+        } else {
+          failedMsgs.add(msg);
+          throw new IOException("nope");
+        }
+      }     
     }
-
     @Override
-    public int send(Collection<message> msgs) throws IOException {
-      for(message msg : msgs) {
-        eventStream.add(msg.toString());
-      }
-      return 0;
+    public void sendAsync(String message) throws Exception {
+      if(!failMode) {
+        eventStream.add(message);
+      } else {
+        failedMsgs.add(message);
+        throw new IOException("nope");
+      }      
     }
-
     @Override
-    public void close() {
-      // Not used by any tests.
-    }     
+    public void sendAsync(Collection<String> messages) throws Exception {
+      for(String msg : messages) {
+        if(!failMode) {
+          eventStream.add(msg);
+        } else {
+          failedMsgs.add(msg);
+          throw new IOException("nope");
+        }
+      }    
+    }
+    
+    @Override
+    public void close() throws Exception {
+      // TODO Auto-generated method stub
+      
+    }    
   }
 }