* ============LICENSE_END============================================
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
-package org.onap.aai.champ.event;
+package org.onap.aai.champcore.event;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onap.aai.champ.ChampCapabilities;
-import org.onap.aai.champ.event.AbstractLoggingChampGraph;
-import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
-import org.onap.aai.champ.exceptions.ChampMarshallingException;
-import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
-import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
-import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
-import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
-import org.onap.aai.champ.model.ChampObject;
-import org.onap.aai.champ.model.ChampObjectConstraint;
-import org.onap.aai.champ.model.ChampObjectIndex;
-import org.onap.aai.champ.model.ChampPartition;
-import org.onap.aai.champ.model.ChampRelationship;
-import org.onap.aai.champ.model.ChampRelationshipConstraint;
-import org.onap.aai.champ.model.ChampRelationshipIndex;
-import org.onap.aai.champ.model.ChampSchema;
-import org.slf4j.Logger;
+import org.onap.aai.champcore.ChampCapabilities;
+import org.onap.aai.champcore.ChampTransaction;
+import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampMarshallingException;
+import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
+import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
+import org.onap.aai.champcore.exceptions.ChampTransactionException;
+import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
+import org.onap.aai.champcore.model.ChampObject;
+import org.onap.aai.champcore.model.ChampObjectConstraint;
+import org.onap.aai.champcore.model.ChampObjectIndex;
+import org.onap.aai.champcore.model.ChampPartition;
+import org.onap.aai.champcore.model.ChampRelationship;
+import org.onap.aai.champcore.model.ChampRelationshipConstraint;
+import org.onap.aai.champcore.model.ChampRelationshipIndex;
+import org.onap.aai.champcore.model.ChampSchema;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.onap.aai.event.api.EventPublisher;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
// Instantiate an 'in-memory' graph for test purposes.
Map<String, Object> graphProperties = new HashMap<String, Object>();
- graphProperties.put("champ.event.stream.hosts", "myeventstreamhost");
- graphProperties.put("champ.event.stream.batch-size", 1);
- testGraph = new TestGraph(graphProperties, producer);
+ graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
+ graphProperties.put("champcore.event.stream.batch-size", 1);
+ graphProperties.put("champcore.event.stream.publisher", producer);
+ testGraph = new TestGraph(graphProperties);
}
/**
* Perform any cleanup that needs to be done after each test.
+ *
+ * @throws Exception
*/
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
// Close our stubbed producer and graph.
producer.close();
* @throws JsonParseException
* @throws JsonMappingException
* @throws IOException
+ * @throws ChampTransactionException
*/
@Test
- public void vertexOperationsTest() throws ChampMarshallingException,
- ChampSchemaViolationException,
- ChampObjectNotExistsException,
- InterruptedException,
- JsonParseException,
- JsonMappingException,
- IOException {
+ public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampObjectNotExistsException,
+ InterruptedException,
+ JsonParseException,
+ JsonMappingException,
+ IOException,
+ ChampTransactionException {
+
+ // Create a vertex and store it in the graph data store.
+ ChampObject obj1 = ChampObject.create()
+ .ofType("foo")
+ .withKey("123")
+ .withProperty("p1", "v1")
+ .withProperty("p2", "v2")
+ .build();
+ testGraph.storeObject(obj1, Optional.empty());
+
+ // Retrieve the next event from the event stream and validate that it is what we expect.
+ String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
+ assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+
+
+
+ // Create a new vertex based on the one that we already created.
+ ChampObject obj2 = ChampObject.create()
+ .from(obj1)
+ .withKey("123")
+ .withProperty("p3", "v3")
+ .build();
+
+ // Now, try doing a replace operation.
+ testGraph.replaceObject(obj2, Optional.empty());
+
+
+
+ // Retrieve the next event from the event stream and validate that it is what we expect.
+ loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
+ assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+
+ // Finally, delete the vertex.
+ testGraph.deleteObject("123", Optional.empty());
+
+ // Retrieve the next event from the event stream and validate that it is what we expect.
+ loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
+ assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
+ assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+ }
+
+ @Test
+ public void vertexOperationsLegacyTest2() throws ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampObjectNotExistsException,
+ InterruptedException,
+ JsonParseException,
+ JsonMappingException,
+ IOException,
+ ChampTransactionException {
// Create a vertex and store it in the graph data store.
ChampObject obj1 = ChampObject.create()
.withProperty("p2", "v2")
.build();
testGraph.storeObject(obj1);
-
+
// Retrieve the next event from the event stream and validate that it is what we expect.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
+
+
// Create a new vertex based on the one that we already created.
ChampObject obj2 = ChampObject.create()
.from(obj1)
// Now, try doing a replace operation.
testGraph.replaceObject(obj2);
+
+
// Retrieve the next event from the event stream and validate that it is what we expect.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
}
-
/**
* This test validates that performing vertex operations in the case where the data to be
* forwarded to the event stream is unavailable results in no event being generated, but
* @throws JsonParseException
* @throws JsonMappingException
* @throws IOException
+ * @throws ChampTransactionException
*/
@Test
public void vertexOperationsWithNullsTest() throws ChampMarshallingException,
InterruptedException,
JsonParseException,
JsonMappingException,
- IOException {
+ IOException, ChampTransactionException {
// Setup our test graph to simulate failures to retrieve data from the graph data store.
testGraph.returnNulls();
.withProperty("p1", "v1")
.withProperty("p2", "v2")
.build();
- testGraph.storeObject(obj1);
+ testGraph.storeObject(obj1, Optional.empty());
// Check our simulated event stream to verify that an event log was produced.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
.build();
// Now, try doing a replace operation.
- testGraph.replaceObject(obj2);
+ testGraph.replaceObject(obj2, Optional.empty());
// Check our simulated event stream to see if an event log was not produced.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
// Finally, delete the vertex.
- testGraph.deleteObject("123");
+ testGraph.deleteObject("123", Optional.empty());
// Check our simulated event stream to see if an event log was not produced.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
* @throws IOException
* @throws ChampUnmarshallingException
* @throws ChampRelationshipNotExistsException
+ * @throws ChampTransactionException
*/
@Test
public void edgeOperationsTest() throws ChampMarshallingException,
JsonMappingException,
IOException,
ChampUnmarshallingException,
- ChampRelationshipNotExistsException {
+ ChampRelationshipNotExistsException, ChampTransactionException {
// Create two vertices to act as the end points of our edge.
ChampObject obj1 = ChampObject.create()
.property("property-1", "value-1")
.property("property-2", "value-2")
.build();
- testGraph.storeRelationship(rel);
+ testGraph.storeRelationship(rel, Optional.empty());
// Retrieve the next event from the event stream and validate that it is what we expect.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
.withKey("123")
.withProperty("property-3", "value-3")
.build();
- testGraph.replaceRelationship(rel2);
+ testGraph.replaceRelationship(rel2, Optional.empty());
// Retrieve the next event from the event stream and validate that it is what we expect.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
// Finally, delete our edge.
- testGraph.deleteRelationship(rel2);
+ testGraph.deleteRelationship(rel2, Optional.empty());
// Retrieve the next event from the event stream and validate that it is what we expect.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
* @throws IOException
* @throws ChampUnmarshallingException
* @throws ChampRelationshipNotExistsException
+ * @throws ChampTransactionException
*/
@Test
public void edgeOperationsWithNullsTest() throws ChampMarshallingException,
JsonMappingException,
IOException,
ChampUnmarshallingException,
- ChampRelationshipNotExistsException {
+ ChampRelationshipNotExistsException, ChampTransactionException {
// Set up our graph to simulate a failure to retrieve some of the data we need to generate
// events.
.property("property-1", "value-1")
.property("property-2", "value-2")
.build();
- testGraph.storeRelationship(rel);
+ testGraph.storeRelationship(rel, Optional.empty());
// Check our simulated event stream to see if an event log was produced.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
.withKey("123")
.withProperty("property-3", "value-3")
.build();
- testGraph.replaceRelationship(rel2);
+ testGraph.replaceRelationship(rel2, Optional.empty());
// Check our simulated event stream to see if an event log was produced.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
* @throws IOException
* @throws ChampUnmarshallingException
* @throws ChampRelationshipNotExistsException
+ * @throws ChampTransactionException
*/
@Test
public void partitionOperationsTest() throws ChampMarshallingException,
JsonMappingException,
IOException,
ChampUnmarshallingException,
- ChampRelationshipNotExistsException {
+ ChampRelationshipNotExistsException, ChampTransactionException {
// Create the vertices and edge objects that we need to create a partition.
ChampObject obj1 = ChampObject.create()
.withObject(obj2)
.withRelationship(rel)
.build();
- testGraph.storePartition(partition);
+ testGraph.storePartition(partition, Optional.empty());
// Retrieve the next event from the event stream and validate that it is what we expect.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
// Now, delete our partition.
- testGraph.deletePartition(partition);
+ testGraph.deletePartition(partition, Optional.empty());
// Retrieve the next event from the event stream and validate that it is what we expect.
loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
* @throws IOException
* @throws ChampUnmarshallingException
* @throws ChampRelationshipNotExistsException
+ * @throws ChampTransactionException
*/
@Test
public void partitionOperationsWithNullsTest() throws ChampMarshallingException,
JsonMappingException,
IOException,
ChampUnmarshallingException,
- ChampRelationshipNotExistsException {
+ ChampRelationshipNotExistsException, ChampTransactionException {
// Set up our graph to simulate a failure to retrieve some of the data we need to generate
// events.
.withObject(obj2)
.withRelationship(rel)
.build();
- testGraph.storePartition(partition);
+ testGraph.storePartition(partition, Optional.empty());
// Check our simulated event stream to see if an an event log was produced.
String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
private boolean returnNulls = false;
- protected TestGraph(Map<String, Object> properties, CambriaPublisher producer) {
- super(properties);
-
- setProducer(producer);
+ protected TestGraph(Map<String, Object> properties) {
+ super(properties);
}
public void returnNulls() {
}
@Override
- public ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException,
- ChampSchemaViolationException,
- ChampObjectNotExistsException {
+ public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction)
+ throws ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampObjectNotExistsException {
+
if(!returnNulls) {
return object;
} else {
}
@Override
- public ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException,
- ChampSchemaViolationException,
- ChampObjectNotExistsException {
+ public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction)
+ throws ChampMarshallingException,
+ ChampSchemaViolationException,
+ ChampObjectNotExistsException {
+
if(!returnNulls) {
return object;
} else {
@Override
public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
+ return retrieveObject(key, Optional.empty());
+ }
+
+ @Override
+ public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
if(!returnNulls) {
return(Optional.of(ChampObject.create()
}
@Override
- public void executeDeleteObject(Object key) throws ChampObjectNotExistsException {
+ public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
}
@Override
public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
+ return queryObjects(queryParams, Optional.empty());
+ }
+
+
+ @Override
+ public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
// Not used by any tests.
return null;
}
@Override
- public ChampRelationship executeStoreRelationship(ChampRelationship relationship)
+ public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
throws ChampUnmarshallingException,
ChampMarshallingException,
ChampObjectNotExistsException,
}
@Override
- public ChampRelationship executeReplaceRelationship(ChampRelationship relationship)
+ public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
throws ChampUnmarshallingException,
ChampMarshallingException,
ChampSchemaViolationException,
@Override
public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
+ return retrieveRelationship(key, Optional.empty());
+ }
+
+ @Override
+ public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
// Not used by any tests.
return null;
}
@Override
- public void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
+ public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
// Not used by any tests.
}
@Override
public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
throws ChampUnmarshallingException, ChampObjectNotExistsException {
+ return retrieveRelationships(object, Optional.empty());
+ }
+
+ @Override
+ public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
+ throws ChampUnmarshallingException, ChampObjectNotExistsException {
// Not used by any tests.
return null;
@Override
public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
+ return queryRelationships(queryParams, Optional.empty());
+ }
+
+ @Override
+ public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
// Not used by any tests.
return null;
}
@Override
- public ChampPartition executeStorePartition(ChampPartition partition)
+ public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction)
throws ChampSchemaViolationException,
ChampRelationshipNotExistsException,
ChampMarshallingException,
}
@Override
- public void executeDeletePartition(ChampPartition graph) {
+ public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
// Not used by any tests.
}
// Not used by any tests.
return null;
}
+
+ @Override
+ public ChampTransaction openTransaction() {
+ // Not used by any tests.
+ return null;
+ }
+
+ @Override
+ public void commitTransaction(ChampTransaction transaction) {
+ // Not used by any tests.
+
+ }
+
+ @Override
+ public void rollbackTransaction(ChampTransaction transaction) {
+ // Not used by any tests.
+ }
}
- private class InMemoryPublisher implements CambriaPublisher {
+ private class InMemoryPublisher implements EventPublisher {
public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
-
private boolean failMode=false;
+
public void enterFailMode() {
failMode=true;
}
@Override
- public void logTo(Logger log) {
- // Not used by any tests.
- }
-
- @Override
- public void setApiCredentials(String apiKey, String apiSecret) {
- // Not used by any tests.
+ public int sendSync(String partitionKey, String message) throws Exception {
+
+ if(!failMode) {
+ eventStream.add(message);
+ return 0;
+ } else {
+ failedMsgs.add(message);
+ throw new IOException("nope");
+ }
}
-
+
@Override
- public void clearApiCredentials() {
- // Not used by any tests.
+ public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
+
+ for(String msg : messages) {
+ if(!failMode) {
+ eventStream.add(msg);
+ return 0;
+ } else {
+ failedMsgs.add(msg);
+ throw new IOException("nope");
+ }
+ }
+ return 0;
}
-
+
@Override
- public void setHttpBasicCredentials(String username, String password) {
- // Not used by any tests.
+ public int sendSync(String message) throws Exception {
+ if(!failMode) {
+ eventStream.add(message);
+ return 0;
+ } else {
+ failedMsgs.add(message);
+ throw new IOException("nope");
+ }
}
-
+
@Override
- public void clearHttpBasicCredentials() {
- // Not used by any tests.
+ public int sendSync(Collection<String> messages) throws Exception {
+
+ for(String msg : messages) {
+ if(!failMode) {
+ eventStream.add(msg);
+ return 0;
+ } else {
+ failedMsgs.add(msg);
+ throw new IOException("nope");
+ }
+ }
+ return 0;
}
-
@Override
- public int send(String partition, String msg) throws IOException {
-
+ public void sendAsync(String partitionKey, String message) throws Exception {
if(!failMode) {
- eventStream.add(msg);
- return 0;
+ eventStream.add(message);
} else {
- failedMsgs.add(msg);
+ failedMsgs.add(message);
throw new IOException("nope");
- }
+ }
}
-
@Override
- public int send(message msg) throws IOException {
- eventStream.add(msg.toString());
- return 0;
+ public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
+ for(String msg : messages) {
+ if(!failMode) {
+ eventStream.add(msg);
+ } else {
+ failedMsgs.add(msg);
+ throw new IOException("nope");
+ }
+ }
}
-
@Override
- public int send(Collection<message> msgs) throws IOException {
- for(message msg : msgs) {
- eventStream.add(msg.toString());
- }
- return 0;
+ public void sendAsync(String message) throws Exception {
+ if(!failMode) {
+ eventStream.add(message);
+ } else {
+ failedMsgs.add(message);
+ throw new IOException("nope");
+ }
}
-
@Override
- public void close() {
- // Not used by any tests.
- }
+ public void sendAsync(Collection<String> messages) throws Exception {
+ for(String msg : messages) {
+ if(!failMode) {
+ eventStream.add(msg);
+ } else {
+ failedMsgs.add(msg);
+ throw new IOException("nope");
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
}
}