2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.aai.champcore.event;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
26 import java.io.IOException;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.stream.Stream;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
40 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champcore.exceptions.ChampTransactionException;
44 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
45 import org.onap.aai.champcore.model.ChampObject;
46 import org.onap.aai.champcore.model.ChampObjectConstraint;
47 import org.onap.aai.champcore.model.ChampObjectIndex;
48 import org.onap.aai.champcore.model.ChampPartition;
49 import org.onap.aai.champcore.model.ChampRelationship;
50 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
51 import org.onap.aai.champcore.model.ChampRelationshipIndex;
52 import org.onap.aai.champcore.model.ChampSchema;
54 import org.onap.aai.event.api.EventPublisher;
55 import com.fasterxml.jackson.core.JsonParseException;
56 import com.fasterxml.jackson.databind.JsonMappingException;
59 public class AbstractLoggingChampGraphTest {
61 /** Event stream producer stub. */
62 private InMemoryPublisher producer;
64 /** In memory graph for testing purposes. */
65 private TestGraph testGraph;
69 * Perform any setup tasks that need to be done prior to each test.
74 // Instantiate an event stream producer stub to use in our tests.
75 producer = new InMemoryPublisher();
77 // Instantiate an 'in-memory' graph for test purposes.
78 Map<String, Object> graphProperties = new HashMap<String, Object>();
79 graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
80 graphProperties.put("champcore.event.stream.batch-size", 1);
81 graphProperties.put("champcore.event.stream.publisher", producer);
82 testGraph = new TestGraph(graphProperties);
87 * Perform any cleanup that needs to be done after each test.
92 public void tearDown() throws Exception {
94 // Close our stubbed producer and graph.
101 * Validates that store/replace/delete operation against vertices result in the expected events
102 * being published to the event stream.
104 * @throws ChampMarshallingException
105 * @throws ChampSchemaViolationException
106 * @throws ChampObjectNotExistsException
107 * @throws InterruptedException
108 * @throws JsonParseException
109 * @throws JsonMappingException
110 * @throws IOException
111 * @throws ChampTransactionException
114 public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException,
115 ChampSchemaViolationException,
116 ChampObjectNotExistsException,
117 InterruptedException,
119 JsonMappingException,
121 ChampTransactionException {
123 // Create a vertex and store it in the graph data store.
124 ChampObject obj1 = ChampObject.create()
127 .withProperty("p1", "v1")
128 .withProperty("p2", "v2")
130 testGraph.storeObject(obj1, Optional.empty());
132 // Retrieve the next event from the event stream and validate that it is what we expect.
133 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
134 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
135 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
139 // Create a new vertex based on the one that we already created.
140 ChampObject obj2 = ChampObject.create()
143 .withProperty("p3", "v3")
146 // Now, try doing a replace operation.
147 testGraph.replaceObject(obj2, Optional.empty());
151 // Retrieve the next event from the event stream and validate that it is what we expect.
152 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
153 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
154 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
156 // Finally, delete the vertex.
157 testGraph.deleteObject("123", Optional.empty());
159 // Retrieve the next event from the event stream and validate that it is what we expect.
160 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
161 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
162 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
166 public void vertexOperationsLegacyTest2() throws ChampMarshallingException,
167 ChampSchemaViolationException,
168 ChampObjectNotExistsException,
169 InterruptedException,
171 JsonMappingException,
173 ChampTransactionException {
175 // Create a vertex and store it in the graph data store.
176 ChampObject obj1 = ChampObject.create()
179 .withProperty("p1", "v1")
180 .withProperty("p2", "v2")
182 testGraph.storeObject(obj1);
184 // Retrieve the next event from the event stream and validate that it is what we expect.
185 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
186 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
187 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
191 // Create a new vertex based on the one that we already created.
192 ChampObject obj2 = ChampObject.create()
195 .withProperty("p3", "v3")
198 // Now, try doing a replace operation.
199 testGraph.replaceObject(obj2);
203 // Retrieve the next event from the event stream and validate that it is what we expect.
204 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
205 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
206 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
208 // Finally, delete the vertex.
209 testGraph.deleteObject("123");
211 // Retrieve the next event from the event stream and validate that it is what we expect.
212 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
213 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
214 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
218 * This test validates that performing vertex operations in the case where the data to be
219 * forwarded to the event stream is unavailable results in no event being generated, but
220 * does not otherwise create issues.
222 * @throws ChampMarshallingException
223 * @throws ChampSchemaViolationException
224 * @throws ChampObjectNotExistsException
225 * @throws InterruptedException
226 * @throws JsonParseException
227 * @throws JsonMappingException
228 * @throws IOException
229 * @throws ChampTransactionException
232 public void vertexOperationsWithNullsTest() throws ChampMarshallingException,
233 ChampSchemaViolationException,
234 ChampObjectNotExistsException,
235 InterruptedException,
237 JsonMappingException,
238 IOException, ChampTransactionException {
240 // Setup our test graph to simulate failures to retrieve data from the graph data store.
241 testGraph.returnNulls();
243 // Create a vertex and store it in the graph data store.
244 ChampObject obj1 = ChampObject.create()
247 .withProperty("p1", "v1")
248 .withProperty("p2", "v2")
250 testGraph.storeObject(obj1, Optional.empty());
252 // Check our simulated event stream to verify that an event log was produced.
253 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
255 // Validate that we did not get an event from the stream.
256 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
258 // Create a new vertex based on the one that we already created.
259 ChampObject obj2 = ChampObject.create()
262 .withProperty("p3", "v3")
265 // Now, try doing a replace operation.
266 testGraph.replaceObject(obj2, Optional.empty());
268 // Check our simulated event stream to see if an event log was not produced.
269 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
271 // Validate that we did not get an event from the stream.
272 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
274 // Finally, delete the vertex.
275 testGraph.deleteObject("123", Optional.empty());
277 // Check our simulated event stream to see if an event log was not produced.
278 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
280 // Validate that we did not get an event from the stream.
281 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
286 * Validates that store/replace/delete operation against edges result in the expected events
287 * being published to the event stream.
289 * @throws ChampMarshallingException
290 * @throws ChampSchemaViolationException
291 * @throws ChampObjectNotExistsException
292 * @throws InterruptedException
293 * @throws JsonParseException
294 * @throws JsonMappingException
295 * @throws IOException
296 * @throws ChampUnmarshallingException
297 * @throws ChampRelationshipNotExistsException
298 * @throws ChampTransactionException
301 public void edgeOperationsTest() throws ChampMarshallingException,
302 ChampSchemaViolationException,
303 ChampObjectNotExistsException,
304 InterruptedException,
306 JsonMappingException,
308 ChampUnmarshallingException,
309 ChampRelationshipNotExistsException, ChampTransactionException {
311 // Create two vertices to act as the end points of our edge.
312 ChampObject obj1 = ChampObject.create()
315 .withProperty("p1", "v1")
316 .withProperty("p2", "v2")
319 ChampObject obj2 = ChampObject.create()
322 .withProperty("p3", "v3")
325 // Now, create an edge object and write it to the graph data store.
326 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
327 .property("property-1", "value-1")
328 .property("property-2", "value-2")
330 testGraph.storeRelationship(rel, Optional.empty());
332 // Retrieve the next event from the event stream and validate that it is what we expect.
333 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
334 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
335 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
337 // Now, create another edge object based on the one we just wrote, and use it to perform
338 // a replace operation.
339 ChampRelationship rel2 = ChampRelationship.create()
342 .withProperty("property-3", "value-3")
344 testGraph.replaceRelationship(rel2, Optional.empty());
346 // Retrieve the next event from the event stream and validate that it is what we expect.
347 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
348 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
349 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
351 // Finally, delete our edge.
352 testGraph.deleteRelationship(rel2, Optional.empty());
354 // Retrieve the next event from the event stream and validate that it is what we expect.
355 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
356 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
357 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
362 * This test validates that performing edge operations in the case where the data to be
363 * forwarded to the event stream is unavailable results in no event being generated, but
364 * does not otherwise create issues.
366 * @throws ChampMarshallingException
367 * @throws ChampSchemaViolationException
368 * @throws ChampObjectNotExistsException
369 * @throws InterruptedException
370 * @throws JsonParseException
371 * @throws JsonMappingException
372 * @throws IOException
373 * @throws ChampUnmarshallingException
374 * @throws ChampRelationshipNotExistsException
375 * @throws ChampTransactionException
378 public void edgeOperationsWithNullsTest() throws ChampMarshallingException,
379 ChampSchemaViolationException,
380 ChampObjectNotExistsException,
381 InterruptedException,
383 JsonMappingException,
385 ChampUnmarshallingException,
386 ChampRelationshipNotExistsException, ChampTransactionException {
388 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
390 testGraph.returnNulls();
392 // Create two vertices to act as the endpoints of our edge.
393 ChampObject obj1 = ChampObject.create()
396 .withProperty("p1", "v1")
397 .withProperty("p2", "v2")
400 ChampObject obj2 = ChampObject.create()
403 .withProperty("p3", "v3")
406 // Now, create an edge object and write it to the graph data store.
407 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
408 .property("property-1", "value-1")
409 .property("property-2", "value-2")
411 testGraph.storeRelationship(rel, Optional.empty());
413 // Check our simulated event stream to see if an event log was produced.
414 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
416 // Validate that we did not get an event from the stream.
417 assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
419 // Now, create another edge object based on the one we just wrote, and use it to perform
420 // a replace operation.
421 ChampRelationship rel2 = ChampRelationship.create()
424 .withProperty("property-3", "value-3")
426 testGraph.replaceRelationship(rel2, Optional.empty());
428 // Check our simulated event stream to see if an event log was produced.
429 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
431 // Validate that we did not get an event from the stream.
432 assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
437 * Validates that store/replace/delete operation against partitions result in the expected events
438 * being published to the event stream.
440 * @throws ChampMarshallingException
441 * @throws ChampSchemaViolationException
442 * @throws ChampObjectNotExistsException
443 * @throws InterruptedException
444 * @throws JsonParseException
445 * @throws JsonMappingException
446 * @throws IOException
447 * @throws ChampUnmarshallingException
448 * @throws ChampRelationshipNotExistsException
449 * @throws ChampTransactionException
452 public void partitionOperationsTest() throws ChampMarshallingException,
453 ChampSchemaViolationException,
454 ChampObjectNotExistsException,
455 InterruptedException,
457 JsonMappingException,
459 ChampUnmarshallingException,
460 ChampRelationshipNotExistsException, ChampTransactionException {
462 // Create the vertices and edge objects that we need to create a partition.
463 ChampObject obj1 = ChampObject.create()
466 .withProperty("p1", "v1")
467 .withProperty("p2", "v2")
470 ChampObject obj2 = ChampObject.create()
473 .withProperty("p3", "v3")
476 // Now, create an edge object and write it to the graph data store.
477 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
478 .property("property-1", "value-1")
479 .property("property-2", "value-2")
482 // Now, create our partition object and store it in the graph.
483 ChampPartition partition = ChampPartition.create()
486 .withRelationship(rel)
488 testGraph.storePartition(partition, Optional.empty());
490 // Retrieve the next event from the event stream and validate that it is what we expect.
491 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
492 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
493 assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
495 // Now, delete our partition.
496 testGraph.deletePartition(partition, Optional.empty());
498 // Retrieve the next event from the event stream and validate that it is what we expect.
499 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
500 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
501 assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
506 * This test validates that performing partition operations in the case where the data to be
507 * forwarded to the event stream is unavailable results in no event being generated, but
508 * does not otherwise create issues.
510 * @throws ChampMarshallingException
511 * @throws ChampSchemaViolationException
512 * @throws ChampObjectNotExistsException
513 * @throws InterruptedException
514 * @throws JsonParseException
515 * @throws JsonMappingException
516 * @throws IOException
517 * @throws ChampUnmarshallingException
518 * @throws ChampRelationshipNotExistsException
519 * @throws ChampTransactionException
522 public void partitionOperationsWithNullsTest() throws ChampMarshallingException,
523 ChampSchemaViolationException,
524 ChampObjectNotExistsException,
525 InterruptedException,
527 JsonMappingException,
529 ChampUnmarshallingException,
530 ChampRelationshipNotExistsException, ChampTransactionException {
532 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
534 testGraph.returnNulls();
536 // Create all of the objects we need to create a partition, and store the partition
538 ChampObject obj1 = ChampObject.create()
541 .withProperty("p1", "v1")
542 .withProperty("p2", "v2")
545 ChampObject obj2 = ChampObject.create()
548 .withProperty("p3", "v3")
551 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
552 .property("property-1", "value-1")
553 .property("property-2", "value-2")
556 ChampPartition partition = ChampPartition.create()
559 .withRelationship(rel)
561 testGraph.storePartition(partition, Optional.empty());
563 // Check our simulated event stream to see if an an event log was produced.
564 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
566 // Validate that we did not get an event from the stream.
567 assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
572 * Validates that store/replace/delete operation against vertex indexes result in the expected
573 * events being published to the event stream.
575 * @throws ChampMarshallingException
576 * @throws ChampSchemaViolationException
577 * @throws ChampObjectNotExistsException
578 * @throws InterruptedException
579 * @throws JsonParseException
580 * @throws JsonMappingException
581 * @throws IOException
582 * @throws ChampUnmarshallingException
583 * @throws ChampRelationshipNotExistsException
584 * @throws ChampIndexNotExistsException
587 public void indexOperationsTest() throws ChampMarshallingException,
588 ChampSchemaViolationException,
589 ChampObjectNotExistsException,
590 InterruptedException,
592 JsonMappingException,
594 ChampUnmarshallingException,
595 ChampRelationshipNotExistsException,
596 ChampIndexNotExistsException {
598 // Create an index object and store it in the graph.
599 ChampObjectIndex objIndex = ChampObjectIndex.create()
604 testGraph.storeObjectIndex(objIndex);
606 // Retrieve the next event from the event stream and validate that it is what we expect.
607 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
608 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
609 assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
611 // Now, delete our partition.
612 testGraph.deleteObjectIndex("myIndex");
614 // Retrieve the next event from the event stream and validate that it is what we expect.
615 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
616 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
617 assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
621 * This test validates that performing index operations in the case where the data to be
622 * forwarded to the event stream is unavailable results in no event being generated, but
623 * does not otherwise create issues.
625 * @throws ChampMarshallingException
626 * @throws ChampSchemaViolationException
627 * @throws ChampObjectNotExistsException
628 * @throws InterruptedException
629 * @throws JsonParseException
630 * @throws JsonMappingException
631 * @throws IOException
632 * @throws ChampUnmarshallingException
633 * @throws ChampRelationshipNotExistsException
634 * @throws ChampIndexNotExistsException
637 public void indexOperationsWithNullsTest() throws ChampMarshallingException,
638 ChampSchemaViolationException,
639 ChampObjectNotExistsException,
640 InterruptedException,
642 JsonMappingException,
644 ChampUnmarshallingException,
645 ChampRelationshipNotExistsException,
646 ChampIndexNotExistsException {
648 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
650 testGraph.returnNulls();
652 // Create an index object and store it in the graph.
653 ChampObjectIndex objIndex = ChampObjectIndex.create()
658 testGraph.storeObjectIndex(objIndex);
660 // Check our simulated event stream to see if an an event log was produced.
661 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
663 // Now, delete our index.
664 testGraph.deleteObjectIndex("myIndex");
666 // Check our simulated event stream to see if an an event log was produced.
667 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
669 // Validate that we did not get an event from the stream.
670 assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
675 * This test validates that performing relationship index operations in the case where
676 * the data to be forwarded to the event stream is unavailable results in no event being
677 * generated, but does not otherwise create issues.
679 * @throws ChampMarshallingException
680 * @throws ChampSchemaViolationException
681 * @throws ChampObjectNotExistsException
682 * @throws InterruptedException
683 * @throws JsonParseException
684 * @throws JsonMappingException
685 * @throws IOException
686 * @throws ChampUnmarshallingException
687 * @throws ChampRelationshipNotExistsException
688 * @throws ChampIndexNotExistsException
691 public void relationshipIndexOperationsTest() throws ChampMarshallingException,
692 ChampSchemaViolationException,
693 ChampObjectNotExistsException,
694 InterruptedException,
696 JsonMappingException,
698 ChampUnmarshallingException,
699 ChampRelationshipNotExistsException,
700 ChampIndexNotExistsException {
702 // Create a relationship index object and store it in the graph.
703 ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
708 testGraph.storeRelationshipIndex(relIndex);
710 // Retrieve the next event from the event stream and validate that it is what we expect.
711 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
712 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
713 assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
715 // Now, delete our partition.
716 testGraph.deleteRelationshipIndex("myIndex");
718 // Retrieve the next event from the event stream and validate that it is what we expect.
719 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
720 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
721 assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
726 * This test validates that performing index operations in the case where the data to be
727 * forwarded to the event stream is unavailable results in no event being generated, but
728 * does not otherwise create issues.
730 * @throws ChampMarshallingException
731 * @throws ChampSchemaViolationException
732 * @throws ChampObjectNotExistsException
733 * @throws InterruptedException
734 * @throws JsonParseException
735 * @throws JsonMappingException
736 * @throws IOException
737 * @throws ChampUnmarshallingException
738 * @throws ChampRelationshipNotExistsException
739 * @throws ChampIndexNotExistsException
742 public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException,
743 ChampSchemaViolationException,
744 ChampObjectNotExistsException,
745 InterruptedException,
747 JsonMappingException,
749 ChampUnmarshallingException,
750 ChampRelationshipNotExistsException,
751 ChampIndexNotExistsException {
753 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
755 testGraph.returnNulls();
757 // Create a relationship index object and store it in the graph.
758 ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
764 testGraph.storeRelationshipIndex(relIndex);
766 // Check our simulated event stream to see if an an event log was produced.
767 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
769 // Now, delete our index.
770 testGraph.deleteRelationshipIndex("myIndex");
772 // Check our simulated event stream to see if an event log was produced.
773 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
775 // Validate that we did not get an event from the stream.
776 assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
781 * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which
782 * we can use to validate that log events get generated without worrying about having a real
785 private class TestGraph extends AbstractLoggingChampGraph {
787 /** If set, this causes simulated retrieve operations to fail. */
788 private boolean returnNulls = false;
791 protected TestGraph(Map<String, Object> properties) {
795 public void returnNulls() {
800 public void shutdown() {
802 publisherPool = null;
808 public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction)
809 throws ChampMarshallingException,
810 ChampSchemaViolationException,
811 ChampObjectNotExistsException {
821 public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction)
822 throws ChampMarshallingException,
823 ChampSchemaViolationException,
824 ChampObjectNotExistsException {
834 public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
835 return retrieveObject(key, Optional.empty());
839 public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
842 return(Optional.of(ChampObject.create()
847 return Optional.empty();
852 public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
857 public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
858 return queryObjects(queryParams, Optional.empty());
863 public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
864 // Not used by any tests.
869 public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
870 throws ChampUnmarshallingException,
871 ChampMarshallingException,
872 ChampObjectNotExistsException,
873 ChampSchemaViolationException,
874 ChampRelationshipNotExistsException {
884 public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
885 throws ChampUnmarshallingException,
886 ChampMarshallingException,
887 ChampSchemaViolationException,
888 ChampRelationshipNotExistsException {
898 public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
899 return retrieveRelationship(key, Optional.empty());
903 public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
904 // Not used by any tests.
909 public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
910 // Not used by any tests.
914 public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
915 throws ChampUnmarshallingException, ChampObjectNotExistsException {
916 return retrieveRelationships(object, Optional.empty());
920 public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
921 throws ChampUnmarshallingException, ChampObjectNotExistsException {
923 // Not used by any tests.
928 public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
929 return queryRelationships(queryParams, Optional.empty());
933 public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
935 // Not used by any tests.
940 public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction)
941 throws ChampSchemaViolationException,
942 ChampRelationshipNotExistsException,
943 ChampMarshallingException,
944 ChampObjectNotExistsException {
954 public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
955 // Not used by any tests.
959 public void executeStoreObjectIndex(ChampObjectIndex index) {
960 // Not used by any tests.
964 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
967 return Optional.of(ChampObjectIndex.create()
969 .onType("doesnt matter")
970 .forField("doesnt matter")
973 return Optional.empty();
978 public Stream<ChampObjectIndex> retrieveObjectIndices() {
979 // Not used by any tests.
984 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
985 // Not used by any tests.
989 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
990 // Not used by any tests.
994 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
996 return Optional.of(ChampRelationshipIndex.create()
998 .onType("doesnt matter")
999 .forField("doesnt matter")
1002 return Optional.empty();
1007 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
1008 // Not used by any tests.
1013 public void executeDeleteRelationshipIndex(String indexName)
1014 throws ChampIndexNotExistsException {
1015 // Not used by any tests.
1019 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1020 // Not used by any tests.
1024 public ChampSchema retrieveSchema() {
1025 // Not used by any tests.
1030 public void updateSchema(ChampObjectConstraint objectConstraint)
1031 throws ChampSchemaViolationException {
1032 // Not used by any tests.
1036 public void updateSchema(ChampRelationshipConstraint schema)
1037 throws ChampSchemaViolationException {
1038 // Not used by any tests.
1042 public void deleteSchema() {
1043 // Not used by any tests.
1047 public ChampCapabilities capabilities() {
1048 // Not used by any tests.
1053 public ChampTransaction openTransaction() {
1054 // Not used by any tests.
1059 public void commitTransaction(ChampTransaction transaction) {
1060 // Not used by any tests.
1065 public void rollbackTransaction(ChampTransaction transaction) {
1066 // Not used by any tests.
1070 private class InMemoryPublisher implements EventPublisher {
1072 public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
1073 public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
1074 private boolean failMode=false;
1077 public void enterFailMode() {
1082 public int sendSync(String partitionKey, String message) throws Exception {
1085 eventStream.add(message);
1088 failedMsgs.add(message);
1089 throw new IOException("nope");
1094 public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
1096 for(String msg : messages) {
1098 eventStream.add(msg);
1101 failedMsgs.add(msg);
1102 throw new IOException("nope");
1109 public int sendSync(String message) throws Exception {
1111 eventStream.add(message);
1114 failedMsgs.add(message);
1115 throw new IOException("nope");
1120 public int sendSync(Collection<String> messages) throws Exception {
1122 for(String msg : messages) {
1124 eventStream.add(msg);
1127 failedMsgs.add(msg);
1128 throw new IOException("nope");
1134 public void sendAsync(String partitionKey, String message) throws Exception {
1136 eventStream.add(message);
1138 failedMsgs.add(message);
1139 throw new IOException("nope");
1143 public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
1144 for(String msg : messages) {
1146 eventStream.add(msg);
1148 failedMsgs.add(msg);
1149 throw new IOException("nope");
1154 public void sendAsync(String message) throws Exception {
1156 eventStream.add(message);
1158 failedMsgs.add(message);
1159 throw new IOException("nope");
1163 public void sendAsync(Collection<String> messages) throws Exception {
1164 for(String msg : messages) {
1166 eventStream.add(msg);
1168 failedMsgs.add(msg);
1169 throw new IOException("nope");
1175 public void close() throws Exception {
1176 // TODO Auto-generated method stub