Update license date and text
[aai/champ.git] / champ-lib / champ-core / src / test / java / org / onap / aai / champcore / event / AbstractLoggingChampGraphTest.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.util.*;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.stream.Stream;
32
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
40 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champcore.exceptions.ChampTransactionException;
44 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
45 import org.onap.aai.champcore.model.ChampObject;
46 import org.onap.aai.champcore.model.ChampObjectConstraint;
47 import org.onap.aai.champcore.model.ChampObjectIndex;
48 import org.onap.aai.champcore.model.ChampPartition;
49 import org.onap.aai.champcore.model.ChampRelationship;
50 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
51 import org.onap.aai.champcore.model.ChampRelationshipIndex;
52 import org.onap.aai.champcore.model.ChampSchema;
53
54 import org.onap.aai.event.api.EventPublisher;
55 import com.fasterxml.jackson.core.JsonParseException;
56 import com.fasterxml.jackson.databind.JsonMappingException;
57
58
59 public class AbstractLoggingChampGraphTest {
60
61   /** Event stream producer stub. */
62   private InMemoryPublisher producer;
63   
64   /** In memory graph for testing purposes. */
65   private TestGraph testGraph;
66   
67   
68   /**
69    * Perform any setup tasks that need to be done prior to each test.
70    */
71   @Before
72   public void setup() {
73     
74     // Instantiate an event stream producer stub to use in our tests.
75     producer = new InMemoryPublisher();
76     
77     // Instantiate an 'in-memory' graph for test purposes.
78     Map<String, Object> graphProperties = new HashMap<String, Object>();
79     graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
80     graphProperties.put("champcore.event.stream.batch-size", 1);
81     graphProperties.put("champcore.event.stream.publisher", producer);
82     testGraph = new TestGraph(graphProperties);
83   }
84   
85   
86   /**
87    * Perform any cleanup that needs to be done after each test.
88    * 
89    * @throws Exception 
90    */
91   @After
92   public void tearDown() throws Exception {
93     
94     // Close our stubbed producer and graph.
95     producer.close();
96     testGraph.shutdown();
97   }
98   
99  
100   /**
101    * Validates that store/replace/delete operation against vertices result in the expected events
102    * being published to the event stream.
103    * 
104    * @throws ChampMarshallingException
105    * @throws ChampSchemaViolationException
106    * @throws ChampObjectNotExistsException
107    * @throws InterruptedException
108    * @throws JsonParseException
109    * @throws JsonMappingException
110    * @throws IOException
111    * @throws ChampTransactionException 
112    */
113   @Test
114   public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException, 
115                                                              ChampSchemaViolationException, 
116                                                              ChampObjectNotExistsException, 
117                                                              InterruptedException, 
118                                                              JsonParseException, 
119                                                              JsonMappingException, 
120                                             IOException, 
121                                             ChampTransactionException {
122             
123     // Create a vertex and store it in the graph data store.
124     ChampObject obj1 = ChampObject.create()
125         .ofType("foo")
126         .withKey("123")
127         .withProperty("p1", "v1")
128         .withProperty("p2", "v2")
129         .build();  
130     testGraph.storeObject(obj1, Optional.empty());
131     
132     // Retrieve the next event from the event stream and validate that it is what we expect.
133     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
134     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
135     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
136   
137     
138     
139     // Create a new vertex based on the one that we already created.
140     ChampObject obj2 = ChampObject.create()
141         .from(obj1)
142         .withKey("123")
143         .withProperty("p3", "v3")
144         .build();
145     
146     // Now, try doing a replace operation.
147     testGraph.replaceObject(obj2, Optional.empty());
148     
149     
150     
151     // Retrieve the next event from the event stream and validate that it is what we expect.
152     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
153     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
154     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
155     
156     // Finally, delete the vertex.
157     testGraph.deleteObject("123", Optional.empty());
158     
159     // Retrieve the next event from the event stream and validate that it is what we expect.
160     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
161     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
162     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
163   }
164   
165   @Test
166   public void vertexOperationsLegacyTest2() throws ChampMarshallingException, 
167                                                    ChampSchemaViolationException, 
168                                                    ChampObjectNotExistsException, 
169                                                    InterruptedException, 
170                                                    JsonParseException, 
171                                                    JsonMappingException, 
172                                                    IOException, 
173                                                    ChampTransactionException {
174             
175     // Create a vertex and store it in the graph data store.
176     ChampObject obj1 = ChampObject.create()
177         .ofType("foo")
178         .withKey("123")
179         .withProperty("p1", "v1")
180         .withProperty("p2", "v2")
181         .build();  
182     testGraph.storeObject(obj1);
183     
184     // Retrieve the next event from the event stream and validate that it is what we expect.
185     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
186     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
187     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
188   
189     
190     
191     // Create a new vertex based on the one that we already created.
192     ChampObject obj2 = ChampObject.create()
193         .from(obj1)
194         .withKey("123")
195         .withProperty("p3", "v3")
196         .build();
197     
198     // Now, try doing a replace operation.
199     testGraph.replaceObject(obj2);
200     
201     
202     
203     // Retrieve the next event from the event stream and validate that it is what we expect.
204     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
205     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
206     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
207     
208     // Finally, delete the vertex.
209     testGraph.deleteObject("123");
210     
211     // Retrieve the next event from the event stream and validate that it is what we expect.
212     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
213     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
214     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
215   }
216   
217   /**
218    * This test validates that performing vertex operations in the case where the data to be
219    * forwarded to the event stream is unavailable results in no event being generated, but
220    * does not otherwise create issues.
221    * 
222    * @throws ChampMarshallingException
223    * @throws ChampSchemaViolationException
224    * @throws ChampObjectNotExistsException
225    * @throws InterruptedException
226    * @throws JsonParseException
227    * @throws JsonMappingException
228    * @throws IOException
229    * @throws ChampTransactionException 
230    */
231   @Test
232   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
233                                                      ChampSchemaViolationException, 
234                                                      ChampObjectNotExistsException, 
235                                                      InterruptedException, 
236                                                      JsonParseException, 
237                                                      JsonMappingException, 
238                                                      IOException, ChampTransactionException {
239             
240     // Setup our test graph to simulate failures to retrieve data from the graph data store.
241     testGraph.returnNulls();
242     
243     // Create a vertex and store it in the graph data store.
244     ChampObject obj1 = ChampObject.create()
245         .ofType("foo")
246         .withKey("123")
247         .withProperty("p1", "v1")
248         .withProperty("p2", "v2")
249         .build();  
250     testGraph.storeObject(obj1, Optional.empty());
251
252     // Check our simulated event stream to verify that an event log was produced.
253     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
254     
255     // Validate that we did not get an event from the stream.
256     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
257     
258     // Create a new vertex based on the one that we already created.
259     ChampObject obj2 = ChampObject.create()
260         .from(obj1)
261         .withKey("123")
262         .withProperty("p3", "v3")
263         .build();
264     
265     // Now, try doing a replace operation.
266     testGraph.replaceObject(obj2, Optional.empty());
267     
268     // Check our simulated event stream to see if an event log was not produced.
269     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
270     
271     // Validate that we did not get an event from the stream.
272     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
273     
274     // Finally, delete the vertex.
275     testGraph.deleteObject("123", Optional.empty());
276     
277     // Check our simulated event stream to see if an event log was not produced.
278     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
279     
280     // Validate that we did not get an event from the stream.
281     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
282   }
283   
284   
285   /**
286    * Validates that store/replace/delete operation against edges result in the expected events
287    * being published to the event stream.
288    *
289    * @throws ChampMarshallingException
290    * @throws ChampSchemaViolationException
291    * @throws ChampObjectNotExistsException
292    * @throws InterruptedException
293    * @throws JsonParseException
294    * @throws JsonMappingException
295    * @throws IOException
296    * @throws ChampUnmarshallingException
297    * @throws ChampRelationshipNotExistsException
298    * @throws ChampTransactionException 
299    */
300   @Test
301   public void edgeOperationsTest() throws ChampMarshallingException, 
302                                           ChampSchemaViolationException, 
303                                           ChampObjectNotExistsException, 
304                                           InterruptedException, 
305                                           JsonParseException, 
306                                           JsonMappingException, 
307                                           IOException, 
308                                           ChampUnmarshallingException, 
309                                           ChampRelationshipNotExistsException, ChampTransactionException {
310     
311     // Create two vertices to act as the end points of our edge.
312     ChampObject obj1 = ChampObject.create()
313         .ofType("foo")
314         .withKey("123")
315         .withProperty("p1", "v1")
316         .withProperty("p2", "v2")
317         .build();  
318
319     ChampObject obj2 = ChampObject.create()
320         .ofType("bar")
321         .withKey("123")
322         .withProperty("p3", "v3")
323         .build();
324     
325     // Now, create an edge object and write it to the graph data store.
326     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
327         .property("property-1", "value-1")
328         .property("property-2", "value-2")
329         .build();
330     testGraph.storeRelationship(rel, Optional.empty());
331     
332     // Retrieve the next event from the event stream and validate that it is what we expect.
333     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
334     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
335     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
336     
337     // Now, create another edge object based on the one we just wrote, and use it to perform
338     // a replace operation.
339     ChampRelationship rel2 = ChampRelationship.create()
340         .from(rel)
341         .withKey("123")
342         .withProperty("property-3", "value-3")
343         .build();
344     testGraph.replaceRelationship(rel2, Optional.empty());
345
346     // Retrieve the next event from the event stream and validate that it is what we expect.
347     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
348     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
349     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
350     
351     // Finally, delete our edge.
352     testGraph.deleteRelationship(rel2, Optional.empty());
353     
354     // Retrieve the next event from the event stream and validate that it is what we expect.
355     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
356     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
357     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
358   }
359   
360   
361   /**
362    * This test validates that performing edge operations in the case where the data to be
363    * forwarded to the event stream is unavailable results in no event being generated, but
364    * does not otherwise create issues.
365    * 
366    * @throws ChampMarshallingException
367    * @throws ChampSchemaViolationException
368    * @throws ChampObjectNotExistsException
369    * @throws InterruptedException
370    * @throws JsonParseException
371    * @throws JsonMappingException
372    * @throws IOException
373    * @throws ChampUnmarshallingException
374    * @throws ChampRelationshipNotExistsException
375    * @throws ChampTransactionException 
376    */
377   @Test
378   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
379                                                    ChampSchemaViolationException, 
380                                                    ChampObjectNotExistsException, 
381                                                    InterruptedException, 
382                                                    JsonParseException, 
383                                                    JsonMappingException, 
384                                                    IOException, 
385                                                    ChampUnmarshallingException, 
386                                                    ChampRelationshipNotExistsException, ChampTransactionException {
387     
388     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
389     // events.
390     testGraph.returnNulls();
391     
392     // Create two vertices to act as the endpoints of our edge.
393     ChampObject obj1 = ChampObject.create()
394         .ofType("foo")
395         .withKey("123")
396         .withProperty("p1", "v1")
397         .withProperty("p2", "v2")
398         .build();  
399
400     ChampObject obj2 = ChampObject.create()
401         .ofType("bar")
402         .withKey("123")
403         .withProperty("p3", "v3")
404         .build();
405     
406     // Now, create an edge object and write it to the graph data store.
407     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
408         .property("property-1", "value-1")
409         .property("property-2", "value-2")
410         .build();
411     testGraph.storeRelationship(rel, Optional.empty());
412     
413     // Check our simulated event stream to see if an event log was produced.
414     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
415     
416     // Validate that we did not get an event from the stream.
417     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
418         
419     // Now, create another edge object based on the one we just wrote, and use it to perform
420     // a replace operation.
421     ChampRelationship rel2 = ChampRelationship.create()
422         .from(rel)
423         .withKey("123")
424         .withProperty("property-3", "value-3")
425         .build();
426     testGraph.replaceRelationship(rel2, Optional.empty());
427     
428     // Check our simulated event stream to see if an event log was produced.
429     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
430     
431     // Validate that we did not get an event from the stream.
432     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);   
433   }
434   
435   
436   /**
437    * Validates that store/replace/delete operation against partitions result in the expected events
438    * being published to the event stream.
439    *
440    * @throws ChampMarshallingException
441    * @throws ChampSchemaViolationException
442    * @throws ChampObjectNotExistsException
443    * @throws InterruptedException
444    * @throws JsonParseException
445    * @throws JsonMappingException
446    * @throws IOException
447    * @throws ChampUnmarshallingException
448    * @throws ChampRelationshipNotExistsException
449    * @throws ChampTransactionException 
450    */
451   @Test
452   public void partitionOperationsTest() throws ChampMarshallingException, 
453                                                ChampSchemaViolationException, 
454                                                ChampObjectNotExistsException, 
455                                                InterruptedException, 
456                                                JsonParseException, 
457                                                JsonMappingException, 
458                                                IOException, 
459                                                ChampUnmarshallingException, 
460                                                ChampRelationshipNotExistsException, ChampTransactionException {
461     
462     // Create the vertices and edge objects that we need to create a partition.
463     ChampObject obj1 = ChampObject.create()
464         .ofType("foo")
465         .withKey("123")
466         .withProperty("p1", "v1")
467         .withProperty("p2", "v2")
468         .build();  
469
470     ChampObject obj2 = ChampObject.create()
471         .ofType("bar")
472         .withKey("123")
473         .withProperty("p3", "v3")
474         .build();
475     
476     // Now, create an edge object and write it to the graph data store.
477     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
478         .property("property-1", "value-1")
479         .property("property-2", "value-2")
480         .build();
481     
482     // Now, create our partition object and store it in the graph.
483     ChampPartition partition = ChampPartition.create()
484         .withObject(obj1)
485         .withObject(obj2)
486         .withRelationship(rel)
487         .build();
488     testGraph.storePartition(partition, Optional.empty());
489     
490     // Retrieve the next event from the event stream and validate that it is what we expect.
491     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
492     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
493     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
494
495     // Now, delete our partition.
496     testGraph.deletePartition(partition, Optional.empty());
497     
498     // Retrieve the next event from the event stream and validate that it is what we expect.
499     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
500     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
501     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
502   }
503   
504   
505   /**
506    * This test validates that performing partition operations in the case where the data to be
507    * forwarded to the event stream is unavailable results in no event being generated, but
508    * does not otherwise create issues.
509    * 
510    * @throws ChampMarshallingException
511    * @throws ChampSchemaViolationException
512    * @throws ChampObjectNotExistsException
513    * @throws InterruptedException
514    * @throws JsonParseException
515    * @throws JsonMappingException
516    * @throws IOException
517    * @throws ChampUnmarshallingException
518    * @throws ChampRelationshipNotExistsException
519    * @throws ChampTransactionException 
520    */
521   @Test
522   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
523                                           ChampSchemaViolationException, 
524                                           ChampObjectNotExistsException, 
525                                           InterruptedException, 
526                                           JsonParseException, 
527                                           JsonMappingException, 
528                                           IOException, 
529                                           ChampUnmarshallingException, 
530                                           ChampRelationshipNotExistsException, ChampTransactionException {
531     
532     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
533     // events.
534     testGraph.returnNulls();
535     
536     // Create all of the objects we need to create a partition, and store the partition
537     // in the graph.
538     ChampObject obj1 = ChampObject.create()
539         .ofType("foo")
540         .withKey("123")
541         .withProperty("p1", "v1")
542         .withProperty("p2", "v2")
543         .build();  
544
545     ChampObject obj2 = ChampObject.create()
546         .ofType("bar")
547         .withKey("123")
548         .withProperty("p3", "v3")
549         .build();
550     
551     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
552         .property("property-1", "value-1")
553         .property("property-2", "value-2")
554         .build();
555     
556     ChampPartition partition = ChampPartition.create()
557         .withObject(obj1)
558         .withObject(obj2)
559         .withRelationship(rel)
560         .build();
561     testGraph.storePartition(partition, Optional.empty());
562     
563     // Check our simulated event stream to see if an an event log was produced.
564     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
565     
566     // Validate that we did not get an event from the stream.
567     assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
568   }
569   
570   
571   /**
572    * Validates that store/replace/delete operation against vertex indexes result in the expected
573    * events being published to the event stream.
574    * 
575    * @throws ChampMarshallingException
576    * @throws ChampSchemaViolationException
577    * @throws ChampObjectNotExistsException
578    * @throws InterruptedException
579    * @throws JsonParseException
580    * @throws JsonMappingException
581    * @throws IOException
582    * @throws ChampUnmarshallingException
583    * @throws ChampRelationshipNotExistsException
584    * @throws ChampIndexNotExistsException
585    */
586   @Test
587   public void indexOperationsTest() throws ChampMarshallingException, 
588                                            ChampSchemaViolationException, 
589                                            ChampObjectNotExistsException, 
590                                            InterruptedException, 
591                                            JsonParseException, 
592                                            JsonMappingException, 
593                                            IOException, 
594                                            ChampUnmarshallingException, 
595                                            ChampRelationshipNotExistsException, 
596                                            ChampIndexNotExistsException {
597         
598     // Create an index object and store it in the graph.
599     ChampObjectIndex objIndex = ChampObjectIndex.create()
600         .ofName("myIndex")
601         .onType("type")
602         .forField("myField")
603         .build();
604     testGraph.storeObjectIndex(objIndex);
605     
606     // Retrieve the next event from the event stream and validate that it is what we expect.
607     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
608     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
609     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
610     
611     // Now, delete our partition.
612     testGraph.deleteObjectIndex("myIndex");
613     
614     // Retrieve the next event from the event stream and validate that it is what we expect.
615     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
616     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
617     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
618   }
619   
620   /**
621    * This test validates that performing index operations in the case where the data to be
622    * forwarded to the event stream is unavailable results in no event being generated, but
623    * does not otherwise create issues.
624    * 
625    * @throws ChampMarshallingException
626    * @throws ChampSchemaViolationException
627    * @throws ChampObjectNotExistsException
628    * @throws InterruptedException
629    * @throws JsonParseException
630    * @throws JsonMappingException
631    * @throws IOException
632    * @throws ChampUnmarshallingException
633    * @throws ChampRelationshipNotExistsException
634    * @throws ChampIndexNotExistsException
635    */
636   @Test
637   public void indexOperationsWithNullsTest() throws ChampMarshallingException, 
638                                                     ChampSchemaViolationException, 
639                                                     ChampObjectNotExistsException, 
640                                                     InterruptedException, 
641                                                     JsonParseException, 
642                                                     JsonMappingException, 
643                                                     IOException, 
644                                                     ChampUnmarshallingException, 
645                                                     ChampRelationshipNotExistsException, 
646                                                     ChampIndexNotExistsException {
647     
648     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
649     // events.
650     testGraph.returnNulls();
651     
652     // Create an index object and store it in the graph.
653     ChampObjectIndex objIndex = ChampObjectIndex.create()
654         .ofName("myIndex")
655         .onType("type")
656         .forField("myField")
657         .build();
658     testGraph.storeObjectIndex(objIndex);
659     
660     // Check our simulated event stream to see if an  an event log was produced.
661     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
662     
663     // Now, delete our index.
664     testGraph.deleteObjectIndex("myIndex");
665     
666     // Check our simulated event stream to see if an an event log was produced.
667     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
668     
669     // Validate that we did not get an event from the stream.
670     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
671   }
672   
673   
674   /**
675    * This test validates that performing relationship index operations in the case where 
676    * the data to be forwarded to the event stream is unavailable results in no event being 
677    * generated, but does not otherwise create issues.
678    * 
679    * @throws ChampMarshallingException
680    * @throws ChampSchemaViolationException
681    * @throws ChampObjectNotExistsException
682    * @throws InterruptedException
683    * @throws JsonParseException
684    * @throws JsonMappingException
685    * @throws IOException
686    * @throws ChampUnmarshallingException
687    * @throws ChampRelationshipNotExistsException
688    * @throws ChampIndexNotExistsException
689    */
690   @Test
691   public void relationshipIndexOperationsTest() throws ChampMarshallingException, 
692                                                        ChampSchemaViolationException, 
693                                                        ChampObjectNotExistsException, 
694                                                        InterruptedException, 
695                                                        JsonParseException, 
696                                                        JsonMappingException, 
697                                                        IOException, 
698                                                        ChampUnmarshallingException, 
699                                                        ChampRelationshipNotExistsException, 
700                                                        ChampIndexNotExistsException {
701         
702     // Create a relationship index object and store it in the graph.
703     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
704         .ofName("myIndex")
705         .onType("type")
706         .forField("myField")
707         .build();
708     testGraph.storeRelationshipIndex(relIndex);
709     
710     // Retrieve the next event from the event stream and validate that it is what we expect.
711     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
712     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
713     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
714     
715     // Now, delete our partition.
716     testGraph.deleteRelationshipIndex("myIndex");
717     
718     // Retrieve the next event from the event stream and validate that it is what we expect.
719     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
720     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
721     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
722   }
723   
724   
725   /**
726    * This test validates that performing index operations in the case where the data to be
727    * forwarded to the event stream is unavailable results in no event being generated, but
728    * does not otherwise create issues.
729    * 
730    * @throws ChampMarshallingException
731    * @throws ChampSchemaViolationException
732    * @throws ChampObjectNotExistsException
733    * @throws InterruptedException
734    * @throws JsonParseException
735    * @throws JsonMappingException
736    * @throws IOException
737    * @throws ChampUnmarshallingException
738    * @throws ChampRelationshipNotExistsException
739    * @throws ChampIndexNotExistsException
740    */
741   @Test
742   public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException, 
743                                                                 ChampSchemaViolationException, 
744                                                                 ChampObjectNotExistsException, 
745                                                                 InterruptedException, 
746                                                                 JsonParseException, 
747                                                                 JsonMappingException, 
748                                                                 IOException, 
749                                                                 ChampUnmarshallingException, 
750                                                                 ChampRelationshipNotExistsException, 
751                                                                 ChampIndexNotExistsException {
752     
753     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
754     // events.
755     testGraph.returnNulls();
756     
757     // Create a relationship index object and store it in the graph.
758     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
759         .ofName("myIndex")
760         .onType("type")
761         .forField("myField")
762         .build();
763     
764     testGraph.storeRelationshipIndex(relIndex);
765     
766     // Check our simulated event stream to see if an an event log was produced.
767     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
768     
769     // Now, delete our index.
770     testGraph.deleteRelationshipIndex("myIndex");
771     
772     // Check our simulated event stream to see if an event log was produced.
773     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
774     
775     // Validate that we did not get an event from the stream.
776     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
777   }
778       
779   
780   /**
781    * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which 
782    * we can use to validate that log events get generated without worrying about having a real
783    * underlying graph.
784    */
785   private class TestGraph extends AbstractLoggingChampGraph {
786     
787     /** If set, this causes simulated retrieve operations to fail. */
788     private boolean returnNulls = false;
789     
790     
791     protected TestGraph(Map<String, Object> properties) {
792       super(properties);      
793     }
794
795     public void returnNulls() {
796       returnNulls = true;
797     }
798     
799     @Override 
800     public void shutdown() {
801       if(returnNulls) {
802         publisherPool = null;
803       }
804       super.shutdown();
805     }
806     
807     @Override
808     public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) 
809         throws ChampMarshallingException,
810                ChampSchemaViolationException, 
811                ChampObjectNotExistsException {
812       
813       if(!returnNulls) {
814         return object;
815       } else {
816         return null;
817       }
818     }
819
820     @Override
821     public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) 
822         throws ChampMarshallingException,
823                ChampSchemaViolationException, 
824                ChampObjectNotExistsException {
825       
826       if(!returnNulls) {
827         return object;
828       } else {
829         return null;
830       }
831     }
832
833     @Override
834     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
835       return retrieveObject(key, Optional.empty());
836     }
837     
838     @Override
839     public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
840       
841       if(!returnNulls) {
842         return(Optional.of(ChampObject.create()
843                             .ofType("foo")
844                             .withKey(key)
845                             .build()));  
846       } else {
847         return Optional.empty();
848       }
849     }
850
851     @Override
852     public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
853    
854     }
855
856     @Override
857     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
858       return queryObjects(queryParams, Optional.empty());
859     }
860
861    
862     @Override
863     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
864       // Not used by any tests.
865       return null;
866     }
867
868     @Override
869     public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
870         throws ChampUnmarshallingException, 
871                ChampMarshallingException, 
872                ChampObjectNotExistsException, 
873                ChampSchemaViolationException,
874                ChampRelationshipNotExistsException {
875
876       if(!returnNulls) {
877         return relationship;
878       } else {
879         return null;
880       }
881     }
882
883     @Override
884     public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
885         throws ChampUnmarshallingException, 
886                ChampMarshallingException,
887                ChampSchemaViolationException, 
888                ChampRelationshipNotExistsException {
889
890       if(!returnNulls) {
891         return relationship;
892       } else {
893         return null;
894       }
895     }
896
897     @Override
898     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
899       return retrieveRelationship(key, Optional.empty());
900     }
901     
902     @Override
903     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
904       // Not used by any tests.
905       return null;
906     }
907
908     @Override
909     public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
910       // Not used by any tests.   
911     }
912
913     @Override
914     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
915         throws ChampUnmarshallingException, ChampObjectNotExistsException {
916       return retrieveRelationships(object, Optional.empty());
917     }
918     
919     @Override
920     public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
921         throws ChampUnmarshallingException, ChampObjectNotExistsException {
922       
923       // Not used by any tests.
924       return null;
925     }
926
927     @Override
928     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
929       return queryRelationships(queryParams, Optional.empty());
930     }
931     
932     @Override
933     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
934       
935       // Not used by any tests.
936       return null;
937     }
938
939     @Override
940     public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) 
941         throws ChampSchemaViolationException, 
942                ChampRelationshipNotExistsException,
943                ChampMarshallingException, 
944                ChampObjectNotExistsException {
945
946       if(!returnNulls) {
947         return partition;
948       } else {
949         return null;
950       }
951     }
952
953     @Override
954     public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
955       // Not used by any tests.     
956     }
957
958     @Override
959     public void executeStoreObjectIndex(ChampObjectIndex index) {
960       // Not used by any tests.    
961     }
962
963     @Override
964     public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
965       
966       if(!returnNulls) {
967         return Optional.of(ChampObjectIndex.create()
968                             .ofName(indexName)
969                             .onType("doesnt matter")
970                             .forField("doesnt matter")
971                             .build());
972       } else {
973         return Optional.empty();
974       }
975     }
976
977     @Override
978     public Stream<ChampObjectIndex> retrieveObjectIndices() {
979       // Not used by any tests.
980       return null;
981     }
982
983     @Override
984     public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
985       // Not used by any tests.    
986     }
987
988     @Override
989     public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
990       // Not used by any tests.  
991     }
992
993     @Override
994     public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
995       if(!returnNulls) {
996         return Optional.of(ChampRelationshipIndex.create()
997                             .ofName(indexName)
998                             .onType("doesnt matter")
999                             .forField("doesnt matter")
1000                             .build());
1001       } else {
1002         return Optional.empty();
1003       }
1004     }
1005
1006     @Override
1007     public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
1008       // Not used by any tests.
1009       return null;
1010     }
1011
1012     @Override
1013     public void executeDeleteRelationshipIndex(String indexName)
1014         throws ChampIndexNotExistsException {
1015       // Not used by any tests.    
1016     }
1017
1018     @Override
1019     public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1020       // Not used by any tests.    
1021     }
1022
1023     @Override
1024     public ChampSchema retrieveSchema() {
1025       // Not used by any tests.
1026       return null;
1027     }
1028
1029     @Override
1030     public void updateSchema(ChampObjectConstraint objectConstraint)
1031         throws ChampSchemaViolationException {
1032       // Not used by any tests.    
1033     }
1034
1035     @Override
1036     public void updateSchema(ChampRelationshipConstraint schema)
1037         throws ChampSchemaViolationException {
1038       // Not used by any tests.     
1039     }
1040
1041     @Override
1042     public void deleteSchema() {
1043       // Not used by any tests.  
1044     }
1045
1046     @Override
1047     public ChampCapabilities capabilities() {
1048       // Not used by any tests.
1049       return null;
1050     }
1051
1052     @Override
1053     public ChampTransaction openTransaction() {
1054       // Not used by any tests.
1055       return null;
1056     }
1057
1058     @Override
1059     public void commitTransaction(ChampTransaction transaction) {
1060       // Not used by any tests.
1061       
1062     }
1063
1064     @Override
1065     public void rollbackTransaction(ChampTransaction transaction) {
1066       // Not used by any tests.    
1067     }
1068   }
1069   
1070   private class InMemoryPublisher implements EventPublisher {
1071
1072     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
1073     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
1074     private boolean failMode=false;
1075     
1076     
1077     public void enterFailMode() {
1078       failMode=true;
1079     }
1080     
1081     @Override
1082     public int sendSync(String partitionKey, String message) throws Exception {
1083       
1084       if(!failMode) {
1085         eventStream.add(message);
1086         return 0;
1087       } else {
1088         failedMsgs.add(message);
1089         throw new IOException("nope");
1090       }
1091     }
1092     
1093     @Override
1094     public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
1095       
1096       for(String msg : messages) {
1097         if(!failMode) {
1098           eventStream.add(msg);
1099           return 0;
1100         } else {
1101           failedMsgs.add(msg);
1102           throw new IOException("nope");
1103         }
1104       }
1105       return 0;
1106     }
1107     
1108     @Override
1109     public int sendSync(String message) throws Exception {
1110       if(!failMode) {
1111         eventStream.add(message);
1112         return 0;
1113       } else {
1114         failedMsgs.add(message);
1115         throw new IOException("nope");
1116       }
1117     }
1118     
1119     @Override
1120     public int sendSync(Collection<String> messages) throws Exception {
1121       
1122       for(String msg : messages) {
1123         if(!failMode) {
1124           eventStream.add(msg);
1125           return 0;
1126         } else {
1127           failedMsgs.add(msg);
1128           throw new IOException("nope");
1129         }
1130       }
1131       return 0;
1132     }
1133     @Override
1134     public void sendAsync(String partitionKey, String message) throws Exception {
1135       if(!failMode) {
1136         eventStream.add(message);
1137       } else {
1138         failedMsgs.add(message);
1139         throw new IOException("nope");
1140       }      
1141     }
1142     @Override
1143     public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
1144       for(String msg : messages) {
1145         if(!failMode) {
1146           eventStream.add(msg);
1147         } else {
1148           failedMsgs.add(msg);
1149           throw new IOException("nope");
1150         }
1151       }     
1152     }
1153     @Override
1154     public void sendAsync(String message) throws Exception {
1155       if(!failMode) {
1156         eventStream.add(message);
1157       } else {
1158         failedMsgs.add(message);
1159         throw new IOException("nope");
1160       }      
1161     }
1162     @Override
1163     public void sendAsync(Collection<String> messages) throws Exception {
1164       for(String msg : messages) {
1165         if(!failMode) {
1166           eventStream.add(msg);
1167         } else {
1168           failedMsgs.add(msg);
1169           throw new IOException("nope");
1170         }
1171       }    
1172     }
1173     
1174     @Override
1175     public void close() throws Exception {
1176       // TODO Auto-generated method stub
1177       
1178     }    
1179   }
1180 }