[AAI-2175] Change aai champ container processes to run as non-root on the host
[aai/champ.git] / champ-lib / champ-core / src / test / java / org / onap / aai / champcore / event / AbstractLoggingChampGraphTest.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champcore.event;
22
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.util.*;
28 import java.util.concurrent.ArrayBlockingQueue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.stream.Stream;
32
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.onap.aai.champcore.ChampCapabilities;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
40 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champcore.exceptions.ChampTransactionException;
44 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
45 import org.onap.aai.champcore.model.ChampObject;
46 import org.onap.aai.champcore.model.ChampObjectConstraint;
47 import org.onap.aai.champcore.model.ChampObjectIndex;
48 import org.onap.aai.champcore.model.ChampPartition;
49 import org.onap.aai.champcore.model.ChampRelationship;
50 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
51 import org.onap.aai.champcore.model.ChampRelationshipIndex;
52 import org.onap.aai.champcore.model.ChampSchema;
53
54 import org.onap.aai.event.api.EventPublisher;
55 import com.fasterxml.jackson.core.JsonParseException;
56 import com.fasterxml.jackson.databind.JsonMappingException;
57
58
59 public class AbstractLoggingChampGraphTest {
60
61   /** Event stream producer stub. */
62   private InMemoryPublisher producer;
63   
64   /** In memory graph for testing purposes. */
65   private TestGraph testGraph;
66   
67   
68   /**
69    * Perform any setup tasks that need to be done prior to each test.
70    */
71   @Before
72   public void setup() {
73     
74     // Instantiate an event stream producer stub to use in our tests.
75     producer = new InMemoryPublisher();
76     
77     // Instantiate an 'in-memory' graph for test purposes.
78     Map<String, Object> graphProperties = new HashMap<String, Object>();
79     graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
80     graphProperties.put("champcore.event.stream.batch-size", 1);
81     graphProperties.put("champcore.event.stream.publisher", producer);
82     testGraph = new TestGraph(graphProperties);
83   }
84   
85   
86   /**
87    * Perform any cleanup that needs to be done after each test.
88    * 
89    * @throws Exception 
90    */
91   @After
92   public void tearDown() throws Exception {
93     
94     // Close our stubbed producer and graph.
95     producer.close();
96     testGraph.shutdown();
97   }
98   
99  
100   /**
101    * Validates that store/replace/delete operation against vertices result in the expected events
102    * being published to the event stream.
103    * 
104    * @throws ChampMarshallingException
105    * @throws ChampSchemaViolationException
106    * @throws ChampObjectNotExistsException
107    * @throws InterruptedException
108    * @throws JsonParseException
109    * @throws JsonMappingException
110    * @throws IOException
111    * @throws ChampTransactionException 
112    */
113   @Test
114   public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException, 
115                                                              ChampSchemaViolationException, 
116                                                              ChampObjectNotExistsException, 
117                                                              InterruptedException, 
118                                                              JsonParseException, 
119                                                              JsonMappingException, 
120                                             IOException, 
121                                             ChampTransactionException {
122             
123     // Create a vertex and store it in the graph data store.
124     ChampObject obj1 = ChampObject.create()
125         .ofType("foo")
126         .withKey("123")
127         .withProperty("p1", "v1")
128         .withProperty("p2", "v2")
129         .build();  
130     testGraph.storeObject(obj1, Optional.empty());
131     
132     // Retrieve the next event from the event stream and validate that it is what we expect.
133     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
134     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
135     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
136   
137     
138     
139     // Create a new vertex based on the one that we already created.
140     ChampObject obj2 = ChampObject.create()
141         .from(obj1)
142         .withKey("123")
143         .withProperty("p3", "v3")
144         .build();
145     
146     // Now, try doing a replace operation.
147     testGraph.replaceObject(obj2, Optional.empty());
148     
149     
150     
151     // Retrieve the next event from the event stream and validate that it is what we expect.
152     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
153     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
154     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
155     
156     // Finally, delete the vertex.
157     testGraph.deleteObject("123", Optional.empty());
158     
159     // Retrieve the next event from the event stream and validate that it is what we expect.
160     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
161     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
162     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
163   }
164   
165   @Test
166   public void vertexOperationsLegacyTest2() throws ChampMarshallingException, 
167                                                    ChampSchemaViolationException, 
168                                                    ChampObjectNotExistsException, 
169                                                    InterruptedException, 
170                                                    JsonParseException, 
171                                                    JsonMappingException, 
172                                                    IOException, 
173                                                    ChampTransactionException {
174             
175     // Create a vertex and store it in the graph data store.
176     ChampObject obj1 = ChampObject.create()
177         .ofType("foo")
178         .withKey("123")
179         .withProperty("p1", "v1")
180         .withProperty("p2", "v2")
181         .build();  
182     testGraph.storeObject(obj1);
183     
184     // Retrieve the next event from the event stream and validate that it is what we expect.
185     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
186     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
187     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
188   
189     
190     
191     // Create a new vertex based on the one that we already created.
192     ChampObject obj2 = ChampObject.create()
193         .from(obj1)
194         .withKey("123")
195         .withProperty("p3", "v3")
196         .build();
197     
198     // Now, try doing a replace operation.
199     testGraph.replaceObject(obj2);
200     
201     
202     
203     // Retrieve the next event from the event stream and validate that it is what we expect.
204     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
205     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
206     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
207     
208     // Finally, delete the vertex.
209     testGraph.deleteObject("123");
210     
211     // Retrieve the next event from the event stream and validate that it is what we expect.
212     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
213     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
214     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
215   }
216   
217   /**
218    * This test validates that performing vertex operations in the case where the data to be
219    * forwarded to the event stream is unavailable results in no event being generated, but
220    * does not otherwise create issues.
221    * 
222    * @throws ChampMarshallingException
223    * @throws ChampSchemaViolationException
224    * @throws ChampObjectNotExistsException
225    * @throws InterruptedException
226    * @throws JsonParseException
227    * @throws JsonMappingException
228    * @throws IOException
229    * @throws ChampTransactionException 
230    */
231   @Test
232   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
233                                                      ChampSchemaViolationException, 
234                                                      ChampObjectNotExistsException, 
235                                                      InterruptedException, 
236                                                      JsonParseException, 
237                                                      JsonMappingException, 
238                                                      IOException, ChampTransactionException {
239             
240     // Setup our test graph to simulate failures to retrieve data from the graph data store.
241     testGraph.returnNulls();
242     
243     // Create a vertex and store it in the graph data store.
244     ChampObject obj1 = ChampObject.create()
245         .ofType("foo")
246         .withKey("123")
247         .withProperty("p1", "v1")
248         .withProperty("p2", "v2")
249         .build();  
250     testGraph.storeObject(obj1, Optional.empty());
251
252     // Check our simulated event stream to verify that an event log was produced.
253     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
254     
255     // Validate that we did not get an event from the stream.
256     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
257     
258     // Create a new vertex based on the one that we already created.
259     ChampObject obj2 = ChampObject.create()
260         .from(obj1)
261         .withKey("123")
262         .withProperty("p3", "v3")
263         .build();
264     
265     // Now, try doing a replace operation.
266     testGraph.replaceObject(obj2, Optional.empty());
267     
268     // Check our simulated event stream to see if an event log was not produced.
269     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
270     
271     // Validate that we did not get an event from the stream.
272     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
273     
274     // Finally, delete the vertex.
275     testGraph.deleteObject("123", Optional.empty());
276     
277     // Check our simulated event stream to see if an event log was not produced.
278     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
279     
280     // Validate that we did not get an event from the stream.
281     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
282   }
283   
284   
285   /**
286    * Validates that store/replace/delete operation against edges result in the expected events
287    * being published to the event stream.
288    *
289    * @throws ChampMarshallingException
290    * @throws ChampSchemaViolationException
291    * @throws ChampObjectNotExistsException
292    * @throws InterruptedException
293    * @throws JsonParseException
294    * @throws JsonMappingException
295    * @throws IOException
296    * @throws ChampUnmarshallingException
297    * @throws ChampRelationshipNotExistsException
298    * @throws ChampTransactionException 
299    */
300   @Test
301   public void edgeOperationsTest() throws ChampMarshallingException, 
302                                           ChampSchemaViolationException, 
303                                           ChampObjectNotExistsException, 
304                                           InterruptedException, 
305                                           JsonParseException, 
306                                           JsonMappingException, 
307                                           IOException, 
308                                           ChampUnmarshallingException, 
309                                           ChampRelationshipNotExistsException, ChampTransactionException {
310     
311     // Create two vertices to act as the end points of our edge.
312     ChampObject obj1 = ChampObject.create()
313         .ofType("foo")
314         .withKey("123")
315         .withProperty("p1", "v1")
316         .withProperty("p2", "v2")
317         .build();  
318
319     ChampObject obj2 = ChampObject.create()
320         .ofType("bar")
321         .withKey("123")
322         .withProperty("p3", "v3")
323         .build();
324     
325     // Now, create an edge object and write it to the graph data store.
326     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
327         .property("property-1", "value-1")
328         .property("property-2", "value-2")
329         .build();
330     testGraph.storeRelationship(rel, Optional.empty());
331     
332     // Retrieve the next event from the event stream and validate that it is what we expect.
333     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
334     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
335     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
336     
337     // Now, create another edge object based on the one we just wrote, and use it to perform
338     // a replace operation.
339     ChampRelationship rel2 = ChampRelationship.create()
340         .from(rel)
341         .withKey("123")
342         .withProperty("property-3", "value-3")
343         .build();
344     testGraph.replaceRelationship(rel2, Optional.empty());
345
346     // Retrieve the next event from the event stream and validate that it is what we expect.
347     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
348     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
349     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
350     
351     // Finally, delete our edge.
352     testGraph.deleteRelationship(rel2, Optional.empty());
353     
354     // Retrieve the next event from the event stream and validate that it is what we expect.
355     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
356     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
357     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
358   }
359   
360   
361   /**
362    * This test validates that performing edge operations in the case where the data to be
363    * forwarded to the event stream is unavailable results in no event being generated, but
364    * does not otherwise create issues.
365    * 
366    * @throws ChampMarshallingException
367    * @throws ChampSchemaViolationException
368    * @throws ChampObjectNotExistsException
369    * @throws InterruptedException
370    * @throws JsonParseException
371    * @throws JsonMappingException
372    * @throws IOException
373    * @throws ChampUnmarshallingException
374    * @throws ChampRelationshipNotExistsException
375    * @throws ChampTransactionException 
376    */
377   @Test
378   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
379                                                    ChampSchemaViolationException, 
380                                                    ChampObjectNotExistsException, 
381                                                    InterruptedException, 
382                                                    JsonParseException, 
383                                                    JsonMappingException, 
384                                                    IOException, 
385                                                    ChampUnmarshallingException, 
386                                                    ChampRelationshipNotExistsException, ChampTransactionException {
387     
388     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
389     // events.
390     testGraph.returnNulls();
391     
392     // Create two vertices to act as the endpoints of our edge.
393     ChampObject obj1 = ChampObject.create()
394         .ofType("foo")
395         .withKey("123")
396         .withProperty("p1", "v1")
397         .withProperty("p2", "v2")
398         .build();  
399
400     ChampObject obj2 = ChampObject.create()
401         .ofType("bar")
402         .withKey("123")
403         .withProperty("p3", "v3")
404         .build();
405     
406     // Now, create an edge object and write it to the graph data store.
407     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
408         .property("property-1", "value-1")
409         .property("property-2", "value-2")
410         .build();
411     testGraph.storeRelationship(rel, Optional.empty());
412     
413     // Check our simulated event stream to see if an event log was produced.
414     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
415     
416     // Validate that we did not get an event from the stream.
417     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
418         
419     // Now, create another edge object based on the one we just wrote, and use it to perform
420     // a replace operation.
421     ChampRelationship rel2 = ChampRelationship.create()
422         .from(rel)
423         .withKey("123")
424         .withProperty("property-3", "value-3")
425         .build();
426     testGraph.replaceRelationship(rel2, Optional.empty());
427     
428     // Check our simulated event stream to see if an event log was produced.
429     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
430     
431     // Validate that we did not get an event from the stream.
432     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);   
433   }
434   
435   
436   /**
437    * Validates that store/replace/delete operation against partitions result in the expected events
438    * being published to the event stream.
439    *
440    * @throws ChampMarshallingException
441    * @throws ChampSchemaViolationException
442    * @throws ChampObjectNotExistsException
443    * @throws InterruptedException
444    * @throws JsonParseException
445    * @throws JsonMappingException
446    * @throws IOException
447    * @throws ChampUnmarshallingException
448    * @throws ChampRelationshipNotExistsException
449    * @throws ChampTransactionException 
450    */
451   @Test
452   public void partitionOperationsTest() throws ChampMarshallingException, 
453                                                ChampSchemaViolationException, 
454                                                ChampObjectNotExistsException, 
455                                                InterruptedException, 
456                                                JsonParseException, 
457                                                JsonMappingException, 
458                                                IOException, 
459                                                ChampUnmarshallingException, 
460                                                ChampRelationshipNotExistsException, ChampTransactionException {
461     
462     // Create the vertices and edge objects that we need to create a partition.
463     ChampObject obj1 = ChampObject.create()
464         .ofType("foo")
465         .withKey("123")
466         .withProperty("p1", "v1")
467         .withProperty("p2", "v2")
468         .build();  
469
470     ChampObject obj2 = ChampObject.create()
471         .ofType("bar")
472         .withKey("123")
473         .withProperty("p3", "v3")
474         .build();
475     
476     // Now, create an edge object and write it to the graph data store.
477     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
478         .property("property-1", "value-1")
479         .property("property-2", "value-2")
480         .build();
481     
482     // Now, create our partition object and store it in the graph.
483     ChampPartition partition = ChampPartition.create()
484         .withObject(obj1)
485         .withObject(obj2)
486         .withRelationship(rel)
487         .build();
488     testGraph.storePartition(partition, Optional.empty());
489     
490     // Retrieve the next event from the event stream and validate that it is what we expect.
491     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
492     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
493     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
494
495     // Now, delete our partition.
496     testGraph.deletePartition(partition, Optional.empty());
497     
498     // Retrieve the next event from the event stream and validate that it is what we expect.
499     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
500     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
501     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
502   }
503   
504   
505   /**
506    * This test validates that performing partition operations in the case where the data to be
507    * forwarded to the event stream is unavailable results in no event being generated, but
508    * does not otherwise create issues.
509    * 
510    * @throws ChampMarshallingException
511    * @throws ChampSchemaViolationException
512    * @throws ChampObjectNotExistsException
513    * @throws InterruptedException
514    * @throws JsonParseException
515    * @throws JsonMappingException
516    * @throws IOException
517    * @throws ChampUnmarshallingException
518    * @throws ChampRelationshipNotExistsException
519    * @throws ChampTransactionException 
520    */
521   @Test
522   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
523                                           ChampSchemaViolationException, 
524                                           ChampObjectNotExistsException, 
525                                           InterruptedException, 
526                                           JsonParseException, 
527                                           JsonMappingException, 
528                                           IOException, 
529                                           ChampUnmarshallingException, 
530                                           ChampRelationshipNotExistsException, ChampTransactionException {
531     
532     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
533     // events.
534     testGraph.returnNulls();
535     
536     // Create all of the objects we need to create a partition, and store the partition
537     // in the graph.
538     ChampObject obj1 = ChampObject.create()
539         .ofType("foo")
540         .withKey("123")
541         .withProperty("p1", "v1")
542         .withProperty("p2", "v2")
543         .build();  
544
545     ChampObject obj2 = ChampObject.create()
546         .ofType("bar")
547         .withKey("123")
548         .withProperty("p3", "v3")
549         .build();
550     
551     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
552         .property("property-1", "value-1")
553         .property("property-2", "value-2")
554         .build();
555     
556     ChampPartition partition = ChampPartition.create()
557         .withObject(obj1)
558         .withObject(obj2)
559         .withRelationship(rel)
560         .build();
561     testGraph.storePartition(partition, Optional.empty());
562     
563     // Check our simulated event stream to see if an an event log was produced.
564     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
565     
566     // Validate that we did not get an event from the stream.
567     assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
568   }
569   
570   
571   /**
572    * Validates that store/replace/delete operation against vertex indexes result in the expected
573    * events being published to the event stream.
574    * 
575    * @throws ChampMarshallingException
576    * @throws ChampSchemaViolationException
577    * @throws ChampObjectNotExistsException
578    * @throws InterruptedException
579    * @throws JsonParseException
580    * @throws JsonMappingException
581    * @throws IOException
582    * @throws ChampUnmarshallingException
583    * @throws ChampRelationshipNotExistsException
584    * @throws ChampIndexNotExistsException
585    */
586   @Test
587   public void indexOperationsTest() throws ChampMarshallingException, 
588                                            ChampSchemaViolationException, 
589                                            ChampObjectNotExistsException, 
590                                            InterruptedException, 
591                                            JsonParseException, 
592                                            JsonMappingException, 
593                                            IOException, 
594                                            ChampUnmarshallingException, 
595                                            ChampRelationshipNotExistsException, 
596                                            ChampIndexNotExistsException {
597         
598     // Create an index object and store it in the graph.
599     List<String> fields = new ArrayList<String>();
600     fields.add("myField");   
601     ChampObjectIndex objIndex = ChampObjectIndex.create()
602         .ofName("myIndex")
603         .onType("type")
604         .forFields(fields)
605         .build();
606     testGraph.storeObjectIndex(objIndex);
607     
608     // Retrieve the next event from the event stream and validate that it is what we expect.
609     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
610     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
611     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
612     
613     // Now, delete our partition.
614     testGraph.deleteObjectIndex("myIndex");
615     
616     // Retrieve the next event from the event stream and validate that it is what we expect.
617     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
618     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
619     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
620   }
621   
622   /**
623    * This test validates that performing index operations in the case where the data to be
624    * forwarded to the event stream is unavailable results in no event being generated, but
625    * does not otherwise create issues.
626    * 
627    * @throws ChampMarshallingException
628    * @throws ChampSchemaViolationException
629    * @throws ChampObjectNotExistsException
630    * @throws InterruptedException
631    * @throws JsonParseException
632    * @throws JsonMappingException
633    * @throws IOException
634    * @throws ChampUnmarshallingException
635    * @throws ChampRelationshipNotExistsException
636    * @throws ChampIndexNotExistsException
637    */
638   @Test
639   public void indexOperationsWithNullsTest() throws ChampMarshallingException, 
640                                                     ChampSchemaViolationException, 
641                                                     ChampObjectNotExistsException, 
642                                                     InterruptedException, 
643                                                     JsonParseException, 
644                                                     JsonMappingException, 
645                                                     IOException, 
646                                                     ChampUnmarshallingException, 
647                                                     ChampRelationshipNotExistsException, 
648                                                     ChampIndexNotExistsException {
649     
650     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
651     // events.
652     testGraph.returnNulls();
653     
654     // Create an index object and store it in the graph.
655     List<String> fields = new ArrayList<String>();
656     fields.add("myField");    
657     ChampObjectIndex objIndex = ChampObjectIndex.create()
658         .ofName("myIndex")
659         .onType("type")
660         .forFields(fields)
661         .build();
662     testGraph.storeObjectIndex(objIndex);
663     
664     // Check our simulated event stream to see if an  an event log was produced.
665     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
666     
667     // Now, delete our index.
668     testGraph.deleteObjectIndex("myIndex");
669     
670     // Check our simulated event stream to see if an an event log was produced.
671     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
672     
673     // Validate that we did not get an event from the stream.
674     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
675   }
676   
677   
678   /**
679    * This test validates that performing relationship index operations in the case where 
680    * the data to be forwarded to the event stream is unavailable results in no event being 
681    * generated, but does not otherwise create issues.
682    * 
683    * @throws ChampMarshallingException
684    * @throws ChampSchemaViolationException
685    * @throws ChampObjectNotExistsException
686    * @throws InterruptedException
687    * @throws JsonParseException
688    * @throws JsonMappingException
689    * @throws IOException
690    * @throws ChampUnmarshallingException
691    * @throws ChampRelationshipNotExistsException
692    * @throws ChampIndexNotExistsException
693    */
694   @Test
695   public void relationshipIndexOperationsTest() throws ChampMarshallingException, 
696                                                        ChampSchemaViolationException, 
697                                                        ChampObjectNotExistsException, 
698                                                        InterruptedException, 
699                                                        JsonParseException, 
700                                                        JsonMappingException, 
701                                                        IOException, 
702                                                        ChampUnmarshallingException, 
703                                                        ChampRelationshipNotExistsException, 
704                                                        ChampIndexNotExistsException {
705         
706     // Create a relationship index object and store it in the graph.
707     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
708         .ofName("myIndex")
709         .onType("type")
710         .forField("myField")
711         .build();
712     testGraph.storeRelationshipIndex(relIndex);
713     
714     // Retrieve the next event from the event stream and validate that it is what we expect.
715     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
716     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
717     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
718     
719     // Now, delete our partition.
720     testGraph.deleteRelationshipIndex("myIndex");
721     
722     // Retrieve the next event from the event stream and validate that it is what we expect.
723     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
724     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
725     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
726   }
727   
728   
729   /**
730    * This test validates that performing index operations in the case where the data to be
731    * forwarded to the event stream is unavailable results in no event being generated, but
732    * does not otherwise create issues.
733    * 
734    * @throws ChampMarshallingException
735    * @throws ChampSchemaViolationException
736    * @throws ChampObjectNotExistsException
737    * @throws InterruptedException
738    * @throws JsonParseException
739    * @throws JsonMappingException
740    * @throws IOException
741    * @throws ChampUnmarshallingException
742    * @throws ChampRelationshipNotExistsException
743    * @throws ChampIndexNotExistsException
744    */
745   @Test
746   public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException, 
747                                                                 ChampSchemaViolationException, 
748                                                                 ChampObjectNotExistsException, 
749                                                                 InterruptedException, 
750                                                                 JsonParseException, 
751                                                                 JsonMappingException, 
752                                                                 IOException, 
753                                                                 ChampUnmarshallingException, 
754                                                                 ChampRelationshipNotExistsException, 
755                                                                 ChampIndexNotExistsException {
756     
757     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
758     // events.
759     testGraph.returnNulls();
760     
761     // Create a relationship index object and store it in the graph.
762     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
763         .ofName("myIndex")
764         .onType("type")
765         .forField("myField")
766         .build();
767     
768     testGraph.storeRelationshipIndex(relIndex);
769     
770     // Check our simulated event stream to see if an an event log was produced.
771     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
772     
773     // Now, delete our index.
774     testGraph.deleteRelationshipIndex("myIndex");
775     
776     // Check our simulated event stream to see if an event log was produced.
777     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
778     
779     // Validate that we did not get an event from the stream.
780     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
781   }
782       
783   
784   /**
785    * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which 
786    * we can use to validate that log events get generated without worrying about having a real
787    * underlying graph.
788    */
789   private class TestGraph extends AbstractLoggingChampGraph {
790     
791     /** If set, this causes simulated retrieve operations to fail. */
792     private boolean returnNulls = false;
793     
794     
795     protected TestGraph(Map<String, Object> properties) {
796       super(properties);      
797     }
798
799     public void returnNulls() {
800       returnNulls = true;
801     }
802     
803     @Override 
804     public void shutdown() {
805       if(returnNulls) {
806         publisherPool = null;
807       }
808       super.shutdown();
809     }
810     
811     @Override
812     public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) 
813         throws ChampMarshallingException,
814                ChampSchemaViolationException, 
815                ChampObjectNotExistsException {
816       
817       if(!returnNulls) {
818         return object;
819       } else {
820         return null;
821       }
822     }
823
824     @Override
825     public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) 
826         throws ChampMarshallingException,
827                ChampSchemaViolationException, 
828                ChampObjectNotExistsException {
829       
830       if(!returnNulls) {
831         return object;
832       } else {
833         return null;
834       }
835     }
836
837     @Override
838     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
839       return retrieveObject(key, Optional.empty());
840     }
841     
842     @Override
843     public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
844       
845       if(!returnNulls) {
846         return(Optional.of(ChampObject.create()
847                             .ofType("foo")
848                             .withKey(key)
849                             .build()));  
850       } else {
851         return Optional.empty();
852       }
853     }
854
855     @Override
856     public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
857    
858     }
859
860     @Override
861     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
862       return queryObjects(queryParams, Optional.empty());
863     }
864
865    
866     @Override
867     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
868       // Not used by any tests.
869       return null;
870     }
871
872     @Override
873     public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
874         throws ChampUnmarshallingException, 
875                ChampMarshallingException, 
876                ChampObjectNotExistsException, 
877                ChampSchemaViolationException,
878                ChampRelationshipNotExistsException {
879
880       if(!returnNulls) {
881         return relationship;
882       } else {
883         return null;
884       }
885     }
886
887     @Override
888     public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
889         throws ChampUnmarshallingException, 
890                ChampMarshallingException,
891                ChampSchemaViolationException, 
892                ChampRelationshipNotExistsException {
893
894       if(!returnNulls) {
895         return relationship;
896       } else {
897         return null;
898       }
899     }
900
901     @Override
902     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
903       return retrieveRelationship(key, Optional.empty());
904     }
905     
906     @Override
907     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
908       // Not used by any tests.
909       return null;
910     }
911
912     @Override
913     public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
914       // Not used by any tests.   
915     }
916
917     @Override
918     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
919         throws ChampUnmarshallingException, ChampObjectNotExistsException {
920       return retrieveRelationships(object, Optional.empty());
921     }
922     
923     @Override
924     public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
925         throws ChampUnmarshallingException, ChampObjectNotExistsException {
926       
927       // Not used by any tests.
928       return null;
929     }
930
931     @Override
932     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
933       return queryRelationships(queryParams, Optional.empty());
934     }
935     
936     @Override
937     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
938       
939       // Not used by any tests.
940       return null;
941     }
942
943     @Override
944     public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) 
945         throws ChampSchemaViolationException, 
946                ChampRelationshipNotExistsException,
947                ChampMarshallingException, 
948                ChampObjectNotExistsException {
949
950       if(!returnNulls) {
951         return partition;
952       } else {
953         return null;
954       }
955     }
956
957     @Override
958     public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
959       // Not used by any tests.     
960     }
961
962     @Override
963     public void executeStoreObjectIndex(ChampObjectIndex index) {
964       // Not used by any tests.    
965     }
966
967     @Override
968     public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
969       
970       if(!returnNulls) {
971         List<String> fields = new ArrayList<String>();
972         fields.add("doesnt matter");
973         return Optional.of(ChampObjectIndex.create()
974                             .ofName(indexName)
975                             .onType("doesnt matter")
976                             .forFields(fields)
977                             .build());
978       } else {
979         return Optional.empty();
980       }
981     }
982
983     @Override
984     public Stream<ChampObjectIndex> retrieveObjectIndices() {
985       // Not used by any tests.
986       return null;
987     }
988
989     @Override
990     public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
991       // Not used by any tests.    
992     }
993
994     @Override
995     public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
996       // Not used by any tests.  
997     }
998
999     @Override
1000     public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
1001       if(!returnNulls) {
1002         return Optional.of(ChampRelationshipIndex.create()
1003                             .ofName(indexName)
1004                             .onType("doesnt matter")
1005                             .forField("doesnt matter")
1006                             .build());
1007       } else {
1008         return Optional.empty();
1009       }
1010     }
1011
1012     @Override
1013     public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
1014       // Not used by any tests.
1015       return null;
1016     }
1017
1018     @Override
1019     public void executeDeleteRelationshipIndex(String indexName)
1020         throws ChampIndexNotExistsException {
1021       // Not used by any tests.    
1022     }
1023
1024     @Override
1025     public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1026       // Not used by any tests.    
1027     }
1028
1029     @Override
1030     public ChampSchema retrieveSchema() {
1031       // Not used by any tests.
1032       return null;
1033     }
1034
1035     @Override
1036     public void updateSchema(ChampObjectConstraint objectConstraint)
1037         throws ChampSchemaViolationException {
1038       // Not used by any tests.    
1039     }
1040
1041     @Override
1042     public void updateSchema(ChampRelationshipConstraint schema)
1043         throws ChampSchemaViolationException {
1044       // Not used by any tests.     
1045     }
1046
1047     @Override
1048     public void deleteSchema() {
1049       // Not used by any tests.  
1050     }
1051
1052     @Override
1053     public ChampCapabilities capabilities() {
1054       // Not used by any tests.
1055       return null;
1056     }
1057
1058     @Override
1059     public ChampTransaction openTransaction() {
1060       // Not used by any tests.
1061       return null;
1062     }
1063
1064     @Override
1065     public void commitTransaction(ChampTransaction transaction) {
1066       // Not used by any tests.
1067       
1068     }
1069
1070     @Override
1071     public void rollbackTransaction(ChampTransaction transaction) {
1072       // Not used by any tests.    
1073     }
1074
1075     @Override
1076     public void createDefaultIndexes() {
1077       // TODO Auto-generated method stub
1078       
1079     }
1080   }
1081   
1082   private class InMemoryPublisher implements EventPublisher {
1083
1084     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
1085     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
1086     private boolean failMode=false;
1087     
1088     
1089     public void enterFailMode() {
1090       failMode=true;
1091     }
1092     
1093     @Override
1094     public int sendSync(String partitionKey, String message) throws Exception {
1095       
1096       if(!failMode) {
1097         eventStream.add(message);
1098         return 0;
1099       } else {
1100         failedMsgs.add(message);
1101         throw new IOException("nope");
1102       }
1103     }
1104     
1105     @Override
1106     public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
1107       
1108       for(String msg : messages) {
1109         if(!failMode) {
1110           eventStream.add(msg);
1111           return 0;
1112         } else {
1113           failedMsgs.add(msg);
1114           throw new IOException("nope");
1115         }
1116       }
1117       return 0;
1118     }
1119     
1120     @Override
1121     public int sendSync(String message) throws Exception {
1122       if(!failMode) {
1123         eventStream.add(message);
1124         return 0;
1125       } else {
1126         failedMsgs.add(message);
1127         throw new IOException("nope");
1128       }
1129     }
1130     
1131     @Override
1132     public int sendSync(Collection<String> messages) throws Exception {
1133       
1134       for(String msg : messages) {
1135         if(!failMode) {
1136           eventStream.add(msg);
1137           return 0;
1138         } else {
1139           failedMsgs.add(msg);
1140           throw new IOException("nope");
1141         }
1142       }
1143       return 0;
1144     }
1145     @Override
1146     public void sendAsync(String partitionKey, String message) throws Exception {
1147       if(!failMode) {
1148         eventStream.add(message);
1149       } else {
1150         failedMsgs.add(message);
1151         throw new IOException("nope");
1152       }      
1153     }
1154     @Override
1155     public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
1156       for(String msg : messages) {
1157         if(!failMode) {
1158           eventStream.add(msg);
1159         } else {
1160           failedMsgs.add(msg);
1161           throw new IOException("nope");
1162         }
1163       }     
1164     }
1165     @Override
1166     public void sendAsync(String message) throws Exception {
1167       if(!failMode) {
1168         eventStream.add(message);
1169       } else {
1170         failedMsgs.add(message);
1171         throw new IOException("nope");
1172       }      
1173     }
1174     @Override
1175     public void sendAsync(Collection<String> messages) throws Exception {
1176       for(String msg : messages) {
1177         if(!failMode) {
1178           eventStream.add(msg);
1179         } else {
1180           failedMsgs.add(msg);
1181           throw new IOException("nope");
1182         }
1183       }    
1184     }
1185     
1186     @Override
1187     public void close() throws Exception {
1188       // TODO Auto-generated method stub
1189       
1190     }    
1191   }
1192 }