2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.onap.aai.champcore.event;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
27 import java.io.IOException;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Stream;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.onap.aai.champcore.ChampCapabilities;
38 import org.onap.aai.champcore.ChampTransaction;
39 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
40 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
41 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
44 import org.onap.aai.champcore.exceptions.ChampTransactionException;
45 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
46 import org.onap.aai.champcore.model.ChampObject;
47 import org.onap.aai.champcore.model.ChampObjectConstraint;
48 import org.onap.aai.champcore.model.ChampObjectIndex;
49 import org.onap.aai.champcore.model.ChampPartition;
50 import org.onap.aai.champcore.model.ChampRelationship;
51 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
52 import org.onap.aai.champcore.model.ChampRelationshipIndex;
53 import org.onap.aai.champcore.model.ChampSchema;
55 import org.onap.aai.event.api.EventPublisher;
56 import com.fasterxml.jackson.core.JsonParseException;
57 import com.fasterxml.jackson.databind.JsonMappingException;
60 public class AbstractLoggingChampGraphTest {
62 /** Event stream producer stub. */
63 private InMemoryPublisher producer;
65 /** In memory graph for testing purposes. */
66 private TestGraph testGraph;
70 * Perform any setup tasks that need to be done prior to each test.
75 // Instantiate an event stream producer stub to use in our tests.
76 producer = new InMemoryPublisher();
78 // Instantiate an 'in-memory' graph for test purposes.
79 Map<String, Object> graphProperties = new HashMap<String, Object>();
80 graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
81 graphProperties.put("champcore.event.stream.batch-size", 1);
82 graphProperties.put("champcore.event.stream.publisher", producer);
83 testGraph = new TestGraph(graphProperties);
88 * Perform any cleanup that needs to be done after each test.
93 public void tearDown() throws Exception {
95 // Close our stubbed producer and graph.
102 * Validates that store/replace/delete operation against vertices result in the expected events
103 * being published to the event stream.
105 * @throws ChampMarshallingException
106 * @throws ChampSchemaViolationException
107 * @throws ChampObjectNotExistsException
108 * @throws InterruptedException
109 * @throws JsonParseException
110 * @throws JsonMappingException
111 * @throws IOException
112 * @throws ChampTransactionException
115 public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException,
116 ChampSchemaViolationException,
117 ChampObjectNotExistsException,
118 InterruptedException,
120 JsonMappingException,
122 ChampTransactionException {
124 // Create a vertex and store it in the graph data store.
125 ChampObject obj1 = ChampObject.create()
128 .withProperty("p1", "v1")
129 .withProperty("p2", "v2")
131 testGraph.storeObject(obj1, Optional.empty());
133 // Retrieve the next event from the event stream and validate that it is what we expect.
134 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
135 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
136 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
140 // Create a new vertex based on the one that we already created.
141 ChampObject obj2 = ChampObject.create()
144 .withProperty("p3", "v3")
147 // Now, try doing a replace operation.
148 testGraph.replaceObject(obj2, Optional.empty());
152 // Retrieve the next event from the event stream and validate that it is what we expect.
153 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
154 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
155 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
157 // Finally, delete the vertex.
158 testGraph.deleteObject("123", Optional.empty());
160 // Retrieve the next event from the event stream and validate that it is what we expect.
161 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
162 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
163 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
167 public void vertexOperationsLegacyTest2() throws ChampMarshallingException,
168 ChampSchemaViolationException,
169 ChampObjectNotExistsException,
170 InterruptedException,
172 JsonMappingException,
174 ChampTransactionException {
176 // Create a vertex and store it in the graph data store.
177 ChampObject obj1 = ChampObject.create()
180 .withProperty("p1", "v1")
181 .withProperty("p2", "v2")
183 testGraph.storeObject(obj1);
185 // Retrieve the next event from the event stream and validate that it is what we expect.
186 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
187 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
188 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
192 // Create a new vertex based on the one that we already created.
193 ChampObject obj2 = ChampObject.create()
196 .withProperty("p3", "v3")
199 // Now, try doing a replace operation.
200 testGraph.replaceObject(obj2);
204 // Retrieve the next event from the event stream and validate that it is what we expect.
205 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
206 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
207 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
209 // Finally, delete the vertex.
210 testGraph.deleteObject("123");
212 // Retrieve the next event from the event stream and validate that it is what we expect.
213 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
214 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
215 assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
219 * This test validates that performing vertex operations in the case where the data to be
220 * forwarded to the event stream is unavailable results in no event being generated, but
221 * does not otherwise create issues.
223 * @throws ChampMarshallingException
224 * @throws ChampSchemaViolationException
225 * @throws ChampObjectNotExistsException
226 * @throws InterruptedException
227 * @throws JsonParseException
228 * @throws JsonMappingException
229 * @throws IOException
230 * @throws ChampTransactionException
233 public void vertexOperationsWithNullsTest() throws ChampMarshallingException,
234 ChampSchemaViolationException,
235 ChampObjectNotExistsException,
236 InterruptedException,
238 JsonMappingException,
239 IOException, ChampTransactionException {
241 // Setup our test graph to simulate failures to retrieve data from the graph data store.
242 testGraph.returnNulls();
244 // Create a vertex and store it in the graph data store.
245 ChampObject obj1 = ChampObject.create()
248 .withProperty("p1", "v1")
249 .withProperty("p2", "v2")
251 testGraph.storeObject(obj1, Optional.empty());
253 // Check our simulated event stream to verify that an event log was produced.
254 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
256 // Validate that we did not get an event from the stream.
257 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
259 // Create a new vertex based on the one that we already created.
260 ChampObject obj2 = ChampObject.create()
263 .withProperty("p3", "v3")
266 // Now, try doing a replace operation.
267 testGraph.replaceObject(obj2, Optional.empty());
269 // Check our simulated event stream to see if an event log was not produced.
270 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
272 // Validate that we did not get an event from the stream.
273 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
275 // Finally, delete the vertex.
276 testGraph.deleteObject("123", Optional.empty());
278 // Check our simulated event stream to see if an event log was not produced.
279 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
281 // Validate that we did not get an event from the stream.
282 assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
287 * Validates that store/replace/delete operation against edges result in the expected events
288 * being published to the event stream.
290 * @throws ChampMarshallingException
291 * @throws ChampSchemaViolationException
292 * @throws ChampObjectNotExistsException
293 * @throws InterruptedException
294 * @throws JsonParseException
295 * @throws JsonMappingException
296 * @throws IOException
297 * @throws ChampUnmarshallingException
298 * @throws ChampRelationshipNotExistsException
299 * @throws ChampTransactionException
302 public void edgeOperationsTest() throws ChampMarshallingException,
303 ChampSchemaViolationException,
304 ChampObjectNotExistsException,
305 InterruptedException,
307 JsonMappingException,
309 ChampUnmarshallingException,
310 ChampRelationshipNotExistsException, ChampTransactionException {
312 // Create two vertices to act as the end points of our edge.
313 ChampObject obj1 = ChampObject.create()
316 .withProperty("p1", "v1")
317 .withProperty("p2", "v2")
320 ChampObject obj2 = ChampObject.create()
323 .withProperty("p3", "v3")
326 // Now, create an edge object and write it to the graph data store.
327 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
328 .property("property-1", "value-1")
329 .property("property-2", "value-2")
331 testGraph.storeRelationship(rel, Optional.empty());
333 // Retrieve the next event from the event stream and validate that it is what we expect.
334 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
335 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
336 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
338 // Now, create another edge object based on the one we just wrote, and use it to perform
339 // a replace operation.
340 ChampRelationship rel2 = ChampRelationship.create()
343 .withProperty("property-3", "value-3")
345 testGraph.replaceRelationship(rel2, Optional.empty());
347 // Retrieve the next event from the event stream and validate that it is what we expect.
348 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
349 assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
350 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
352 // Finally, delete our edge.
353 testGraph.deleteRelationship(rel2, Optional.empty());
355 // Retrieve the next event from the event stream and validate that it is what we expect.
356 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
357 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
358 assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
363 * This test validates that performing edge operations in the case where the data to be
364 * forwarded to the event stream is unavailable results in no event being generated, but
365 * does not otherwise create issues.
367 * @throws ChampMarshallingException
368 * @throws ChampSchemaViolationException
369 * @throws ChampObjectNotExistsException
370 * @throws InterruptedException
371 * @throws JsonParseException
372 * @throws JsonMappingException
373 * @throws IOException
374 * @throws ChampUnmarshallingException
375 * @throws ChampRelationshipNotExistsException
376 * @throws ChampTransactionException
379 public void edgeOperationsWithNullsTest() throws ChampMarshallingException,
380 ChampSchemaViolationException,
381 ChampObjectNotExistsException,
382 InterruptedException,
384 JsonMappingException,
386 ChampUnmarshallingException,
387 ChampRelationshipNotExistsException, ChampTransactionException {
389 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
391 testGraph.returnNulls();
393 // Create two vertices to act as the endpoints of our edge.
394 ChampObject obj1 = ChampObject.create()
397 .withProperty("p1", "v1")
398 .withProperty("p2", "v2")
401 ChampObject obj2 = ChampObject.create()
404 .withProperty("p3", "v3")
407 // Now, create an edge object and write it to the graph data store.
408 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
409 .property("property-1", "value-1")
410 .property("property-2", "value-2")
412 testGraph.storeRelationship(rel, Optional.empty());
414 // Check our simulated event stream to see if an event log was produced.
415 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
417 // Validate that we did not get an event from the stream.
418 assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
420 // Now, create another edge object based on the one we just wrote, and use it to perform
421 // a replace operation.
422 ChampRelationship rel2 = ChampRelationship.create()
425 .withProperty("property-3", "value-3")
427 testGraph.replaceRelationship(rel2, Optional.empty());
429 // Check our simulated event stream to see if an event log was produced.
430 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
432 // Validate that we did not get an event from the stream.
433 assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
438 * Validates that store/replace/delete operation against partitions result in the expected events
439 * being published to the event stream.
441 * @throws ChampMarshallingException
442 * @throws ChampSchemaViolationException
443 * @throws ChampObjectNotExistsException
444 * @throws InterruptedException
445 * @throws JsonParseException
446 * @throws JsonMappingException
447 * @throws IOException
448 * @throws ChampUnmarshallingException
449 * @throws ChampRelationshipNotExistsException
450 * @throws ChampTransactionException
453 public void partitionOperationsTest() throws ChampMarshallingException,
454 ChampSchemaViolationException,
455 ChampObjectNotExistsException,
456 InterruptedException,
458 JsonMappingException,
460 ChampUnmarshallingException,
461 ChampRelationshipNotExistsException, ChampTransactionException {
463 // Create the vertices and edge objects that we need to create a partition.
464 ChampObject obj1 = ChampObject.create()
467 .withProperty("p1", "v1")
468 .withProperty("p2", "v2")
471 ChampObject obj2 = ChampObject.create()
474 .withProperty("p3", "v3")
477 // Now, create an edge object and write it to the graph data store.
478 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
479 .property("property-1", "value-1")
480 .property("property-2", "value-2")
483 // Now, create our partition object and store it in the graph.
484 ChampPartition partition = ChampPartition.create()
487 .withRelationship(rel)
489 testGraph.storePartition(partition, Optional.empty());
491 // Retrieve the next event from the event stream and validate that it is what we expect.
492 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
493 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
494 assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
496 // Now, delete our partition.
497 testGraph.deletePartition(partition, Optional.empty());
499 // Retrieve the next event from the event stream and validate that it is what we expect.
500 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
501 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
502 assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
507 * This test validates that performing partition operations in the case where the data to be
508 * forwarded to the event stream is unavailable results in no event being generated, but
509 * does not otherwise create issues.
511 * @throws ChampMarshallingException
512 * @throws ChampSchemaViolationException
513 * @throws ChampObjectNotExistsException
514 * @throws InterruptedException
515 * @throws JsonParseException
516 * @throws JsonMappingException
517 * @throws IOException
518 * @throws ChampUnmarshallingException
519 * @throws ChampRelationshipNotExistsException
520 * @throws ChampTransactionException
523 public void partitionOperationsWithNullsTest() throws ChampMarshallingException,
524 ChampSchemaViolationException,
525 ChampObjectNotExistsException,
526 InterruptedException,
528 JsonMappingException,
530 ChampUnmarshallingException,
531 ChampRelationshipNotExistsException, ChampTransactionException {
533 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
535 testGraph.returnNulls();
537 // Create all of the objects we need to create a partition, and store the partition
539 ChampObject obj1 = ChampObject.create()
542 .withProperty("p1", "v1")
543 .withProperty("p2", "v2")
546 ChampObject obj2 = ChampObject.create()
549 .withProperty("p3", "v3")
552 ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
553 .property("property-1", "value-1")
554 .property("property-2", "value-2")
557 ChampPartition partition = ChampPartition.create()
560 .withRelationship(rel)
562 testGraph.storePartition(partition, Optional.empty());
564 // Check our simulated event stream to see if an an event log was produced.
565 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
567 // Validate that we did not get an event from the stream.
568 assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
573 * Validates that store/replace/delete operation against vertex indexes result in the expected
574 * events being published to the event stream.
576 * @throws ChampMarshallingException
577 * @throws ChampSchemaViolationException
578 * @throws ChampObjectNotExistsException
579 * @throws InterruptedException
580 * @throws JsonParseException
581 * @throws JsonMappingException
582 * @throws IOException
583 * @throws ChampUnmarshallingException
584 * @throws ChampRelationshipNotExistsException
585 * @throws ChampIndexNotExistsException
588 public void indexOperationsTest() throws ChampMarshallingException,
589 ChampSchemaViolationException,
590 ChampObjectNotExistsException,
591 InterruptedException,
593 JsonMappingException,
595 ChampUnmarshallingException,
596 ChampRelationshipNotExistsException,
597 ChampIndexNotExistsException {
599 // Create an index object and store it in the graph.
600 ChampObjectIndex objIndex = ChampObjectIndex.create()
605 testGraph.storeObjectIndex(objIndex);
607 // Retrieve the next event from the event stream and validate that it is what we expect.
608 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
609 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
610 assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
612 // Now, delete our partition.
613 testGraph.deleteObjectIndex("myIndex");
615 // Retrieve the next event from the event stream and validate that it is what we expect.
616 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
617 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
618 assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
622 * This test validates that performing index operations in the case where the data to be
623 * forwarded to the event stream is unavailable results in no event being generated, but
624 * does not otherwise create issues.
626 * @throws ChampMarshallingException
627 * @throws ChampSchemaViolationException
628 * @throws ChampObjectNotExistsException
629 * @throws InterruptedException
630 * @throws JsonParseException
631 * @throws JsonMappingException
632 * @throws IOException
633 * @throws ChampUnmarshallingException
634 * @throws ChampRelationshipNotExistsException
635 * @throws ChampIndexNotExistsException
638 public void indexOperationsWithNullsTest() throws ChampMarshallingException,
639 ChampSchemaViolationException,
640 ChampObjectNotExistsException,
641 InterruptedException,
643 JsonMappingException,
645 ChampUnmarshallingException,
646 ChampRelationshipNotExistsException,
647 ChampIndexNotExistsException {
649 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
651 testGraph.returnNulls();
653 // Create an index object and store it in the graph.
654 ChampObjectIndex objIndex = ChampObjectIndex.create()
659 testGraph.storeObjectIndex(objIndex);
661 // Check our simulated event stream to see if an an event log was produced.
662 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
664 // Now, delete our index.
665 testGraph.deleteObjectIndex("myIndex");
667 // Check our simulated event stream to see if an an event log was produced.
668 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
670 // Validate that we did not get an event from the stream.
671 assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
676 * This test validates that performing relationship index operations in the case where
677 * the data to be forwarded to the event stream is unavailable results in no event being
678 * generated, but does not otherwise create issues.
680 * @throws ChampMarshallingException
681 * @throws ChampSchemaViolationException
682 * @throws ChampObjectNotExistsException
683 * @throws InterruptedException
684 * @throws JsonParseException
685 * @throws JsonMappingException
686 * @throws IOException
687 * @throws ChampUnmarshallingException
688 * @throws ChampRelationshipNotExistsException
689 * @throws ChampIndexNotExistsException
692 public void relationshipIndexOperationsTest() throws ChampMarshallingException,
693 ChampSchemaViolationException,
694 ChampObjectNotExistsException,
695 InterruptedException,
697 JsonMappingException,
699 ChampUnmarshallingException,
700 ChampRelationshipNotExistsException,
701 ChampIndexNotExistsException {
703 // Create a relationship index object and store it in the graph.
704 ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
709 testGraph.storeRelationshipIndex(relIndex);
711 // Retrieve the next event from the event stream and validate that it is what we expect.
712 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
713 assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
714 assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
716 // Now, delete our partition.
717 testGraph.deleteRelationshipIndex("myIndex");
719 // Retrieve the next event from the event stream and validate that it is what we expect.
720 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
721 assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
722 assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
727 * This test validates that performing index operations in the case where the data to be
728 * forwarded to the event stream is unavailable results in no event being generated, but
729 * does not otherwise create issues.
731 * @throws ChampMarshallingException
732 * @throws ChampSchemaViolationException
733 * @throws ChampObjectNotExistsException
734 * @throws InterruptedException
735 * @throws JsonParseException
736 * @throws JsonMappingException
737 * @throws IOException
738 * @throws ChampUnmarshallingException
739 * @throws ChampRelationshipNotExistsException
740 * @throws ChampIndexNotExistsException
743 public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException,
744 ChampSchemaViolationException,
745 ChampObjectNotExistsException,
746 InterruptedException,
748 JsonMappingException,
750 ChampUnmarshallingException,
751 ChampRelationshipNotExistsException,
752 ChampIndexNotExistsException {
754 // Set up our graph to simulate a failure to retrieve some of the data we need to generate
756 testGraph.returnNulls();
758 // Create a relationship index object and store it in the graph.
759 ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
765 testGraph.storeRelationshipIndex(relIndex);
767 // Check our simulated event stream to see if an an event log was produced.
768 String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
770 // Now, delete our index.
771 testGraph.deleteRelationshipIndex("myIndex");
773 // Check our simulated event stream to see if an event log was produced.
774 loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
776 // Validate that we did not get an event from the stream.
777 assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
782 * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which
783 * we can use to validate that log events get generated without worrying about having a real
786 private class TestGraph extends AbstractLoggingChampGraph {
788 /** If set, this causes simulated retrieve operations to fail. */
789 private boolean returnNulls = false;
792 protected TestGraph(Map<String, Object> properties) {
796 public void returnNulls() {
801 public void shutdown() {
803 publisherPool = null;
809 public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction)
810 throws ChampMarshallingException,
811 ChampSchemaViolationException,
812 ChampObjectNotExistsException {
822 public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction)
823 throws ChampMarshallingException,
824 ChampSchemaViolationException,
825 ChampObjectNotExistsException {
835 public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
836 return retrieveObject(key, Optional.empty());
840 public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
843 return(Optional.of(ChampObject.create()
848 return Optional.empty();
853 public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
858 public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
859 return queryObjects(queryParams, Optional.empty());
864 public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
865 // Not used by any tests.
870 public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
871 throws ChampUnmarshallingException,
872 ChampMarshallingException,
873 ChampObjectNotExistsException,
874 ChampSchemaViolationException,
875 ChampRelationshipNotExistsException {
885 public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
886 throws ChampUnmarshallingException,
887 ChampMarshallingException,
888 ChampSchemaViolationException,
889 ChampRelationshipNotExistsException {
899 public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
900 return retrieveRelationship(key, Optional.empty());
904 public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
905 // Not used by any tests.
910 public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
911 // Not used by any tests.
915 public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
916 throws ChampUnmarshallingException, ChampObjectNotExistsException {
917 return retrieveRelationships(object, Optional.empty());
921 public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
922 throws ChampUnmarshallingException, ChampObjectNotExistsException {
924 // Not used by any tests.
929 public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
930 return queryRelationships(queryParams, Optional.empty());
934 public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
936 // Not used by any tests.
941 public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction)
942 throws ChampSchemaViolationException,
943 ChampRelationshipNotExistsException,
944 ChampMarshallingException,
945 ChampObjectNotExistsException {
955 public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
956 // Not used by any tests.
960 public void executeStoreObjectIndex(ChampObjectIndex index) {
961 // Not used by any tests.
965 public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
968 return Optional.of(ChampObjectIndex.create()
970 .onType("doesnt matter")
971 .forField("doesnt matter")
974 return Optional.empty();
979 public Stream<ChampObjectIndex> retrieveObjectIndices() {
980 // Not used by any tests.
985 public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
986 // Not used by any tests.
990 public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
991 // Not used by any tests.
995 public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
997 return Optional.of(ChampRelationshipIndex.create()
999 .onType("doesnt matter")
1000 .forField("doesnt matter")
1003 return Optional.empty();
1008 public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
1009 // Not used by any tests.
1014 public void executeDeleteRelationshipIndex(String indexName)
1015 throws ChampIndexNotExistsException {
1016 // Not used by any tests.
1020 public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1021 // Not used by any tests.
1025 public ChampSchema retrieveSchema() {
1026 // Not used by any tests.
1031 public void updateSchema(ChampObjectConstraint objectConstraint)
1032 throws ChampSchemaViolationException {
1033 // Not used by any tests.
1037 public void updateSchema(ChampRelationshipConstraint schema)
1038 throws ChampSchemaViolationException {
1039 // Not used by any tests.
1043 public void deleteSchema() {
1044 // Not used by any tests.
1048 public ChampCapabilities capabilities() {
1049 // Not used by any tests.
1054 public ChampTransaction openTransaction() {
1055 // Not used by any tests.
1060 public void commitTransaction(ChampTransaction transaction) {
1061 // Not used by any tests.
1066 public void rollbackTransaction(ChampTransaction transaction) {
1067 // Not used by any tests.
1071 private class InMemoryPublisher implements EventPublisher {
1073 public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
1074 public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
1075 private boolean failMode=false;
1078 public void enterFailMode() {
1083 public int sendSync(String partitionKey, String message) throws Exception {
1086 eventStream.add(message);
1089 failedMsgs.add(message);
1090 throw new IOException("nope");
1095 public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
1097 for(String msg : messages) {
1099 eventStream.add(msg);
1102 failedMsgs.add(msg);
1103 throw new IOException("nope");
1110 public int sendSync(String message) throws Exception {
1112 eventStream.add(message);
1115 failedMsgs.add(message);
1116 throw new IOException("nope");
1121 public int sendSync(Collection<String> messages) throws Exception {
1123 for(String msg : messages) {
1125 eventStream.add(msg);
1128 failedMsgs.add(msg);
1129 throw new IOException("nope");
1135 public void sendAsync(String partitionKey, String message) throws Exception {
1137 eventStream.add(message);
1139 failedMsgs.add(message);
1140 throw new IOException("nope");
1144 public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
1145 for(String msg : messages) {
1147 eventStream.add(msg);
1149 failedMsgs.add(msg);
1150 throw new IOException("nope");
1155 public void sendAsync(String message) throws Exception {
1157 eventStream.add(message);
1159 failedMsgs.add(message);
1160 throw new IOException("nope");
1164 public void sendAsync(Collection<String> messages) throws Exception {
1165 for(String msg : messages) {
1167 eventStream.add(msg);
1169 failedMsgs.add(msg);
1170 throw new IOException("nope");
1176 public void close() throws Exception {
1177 // TODO Auto-generated method stub