1f7451ada15d2f0291473ff5a061c9123387651d
[aai/champ.git] / champ-lib / champ-core / src / test / java / org / onap / aai / champcore / event / AbstractLoggingChampGraphTest.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champcore.event;
23
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.*;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Stream;
33
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.onap.aai.champcore.ChampCapabilities;
38 import org.onap.aai.champcore.ChampTransaction;
39 import org.onap.aai.champcore.exceptions.ChampIndexNotExistsException;
40 import org.onap.aai.champcore.exceptions.ChampMarshallingException;
41 import org.onap.aai.champcore.exceptions.ChampObjectNotExistsException;
42 import org.onap.aai.champcore.exceptions.ChampRelationshipNotExistsException;
43 import org.onap.aai.champcore.exceptions.ChampSchemaViolationException;
44 import org.onap.aai.champcore.exceptions.ChampTransactionException;
45 import org.onap.aai.champcore.exceptions.ChampUnmarshallingException;
46 import org.onap.aai.champcore.model.ChampObject;
47 import org.onap.aai.champcore.model.ChampObjectConstraint;
48 import org.onap.aai.champcore.model.ChampObjectIndex;
49 import org.onap.aai.champcore.model.ChampPartition;
50 import org.onap.aai.champcore.model.ChampRelationship;
51 import org.onap.aai.champcore.model.ChampRelationshipConstraint;
52 import org.onap.aai.champcore.model.ChampRelationshipIndex;
53 import org.onap.aai.champcore.model.ChampSchema;
54
55 import org.onap.aai.event.api.EventPublisher;
56 import com.fasterxml.jackson.core.JsonParseException;
57 import com.fasterxml.jackson.databind.JsonMappingException;
58
59
60 public class AbstractLoggingChampGraphTest {
61
62   /** Event stream producer stub. */
63   private InMemoryPublisher producer;
64   
65   /** In memory graph for testing purposes. */
66   private TestGraph testGraph;
67   
68   
69   /**
70    * Perform any setup tasks that need to be done prior to each test.
71    */
72   @Before
73   public void setup() {
74     
75     // Instantiate an event stream producer stub to use in our tests.
76     producer = new InMemoryPublisher();
77     
78     // Instantiate an 'in-memory' graph for test purposes.
79     Map<String, Object> graphProperties = new HashMap<String, Object>();
80     graphProperties.put("champcore.event.stream.hosts", "myeventstreamhost");
81     graphProperties.put("champcore.event.stream.batch-size", 1);
82     graphProperties.put("champcore.event.stream.publisher", producer);
83     testGraph = new TestGraph(graphProperties);
84   }
85   
86   
87   /**
88    * Perform any cleanup that needs to be done after each test.
89    * 
90    * @throws Exception 
91    */
92   @After
93   public void tearDown() throws Exception {
94     
95     // Close our stubbed producer and graph.
96     producer.close();
97     testGraph.shutdown();
98   }
99   
100  
101   /**
102    * Validates that store/replace/delete operation against vertices result in the expected events
103    * being published to the event stream.
104    * 
105    * @throws ChampMarshallingException
106    * @throws ChampSchemaViolationException
107    * @throws ChampObjectNotExistsException
108    * @throws InterruptedException
109    * @throws JsonParseException
110    * @throws JsonMappingException
111    * @throws IOException
112    * @throws ChampTransactionException 
113    */
114   @Test
115   public void vertexOperationsEmptyTransactionsTest() throws ChampMarshallingException, 
116                                                              ChampSchemaViolationException, 
117                                                              ChampObjectNotExistsException, 
118                                                              InterruptedException, 
119                                                              JsonParseException, 
120                                                              JsonMappingException, 
121                                             IOException, 
122                                             ChampTransactionException {
123             
124     // Create a vertex and store it in the graph data store.
125     ChampObject obj1 = ChampObject.create()
126         .ofType("foo")
127         .withKey("123")
128         .withProperty("p1", "v1")
129         .withProperty("p2", "v2")
130         .build();  
131     testGraph.storeObject(obj1, Optional.empty());
132     
133     // Retrieve the next event from the event stream and validate that it is what we expect.
134     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
135     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
136     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
137   
138     
139     
140     // Create a new vertex based on the one that we already created.
141     ChampObject obj2 = ChampObject.create()
142         .from(obj1)
143         .withKey("123")
144         .withProperty("p3", "v3")
145         .build();
146     
147     // Now, try doing a replace operation.
148     testGraph.replaceObject(obj2, Optional.empty());
149     
150     
151     
152     // Retrieve the next event from the event stream and validate that it is what we expect.
153     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
154     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
155     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
156     
157     // Finally, delete the vertex.
158     testGraph.deleteObject("123", Optional.empty());
159     
160     // Retrieve the next event from the event stream and validate that it is what we expect.
161     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
162     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
163     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
164   }
165   
166   @Test
167   public void vertexOperationsLegacyTest2() throws ChampMarshallingException, 
168                                                    ChampSchemaViolationException, 
169                                                    ChampObjectNotExistsException, 
170                                                    InterruptedException, 
171                                                    JsonParseException, 
172                                                    JsonMappingException, 
173                                                    IOException, 
174                                                    ChampTransactionException {
175             
176     // Create a vertex and store it in the graph data store.
177     ChampObject obj1 = ChampObject.create()
178         .ofType("foo")
179         .withKey("123")
180         .withProperty("p1", "v1")
181         .withProperty("p2", "v2")
182         .build();  
183     testGraph.storeObject(obj1);
184     
185     // Retrieve the next event from the event stream and validate that it is what we expect.
186     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
187     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
188     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
189   
190     
191     
192     // Create a new vertex based on the one that we already created.
193     ChampObject obj2 = ChampObject.create()
194         .from(obj1)
195         .withKey("123")
196         .withProperty("p3", "v3")
197         .build();
198     
199     // Now, try doing a replace operation.
200     testGraph.replaceObject(obj2);
201     
202     
203     
204     // Retrieve the next event from the event stream and validate that it is what we expect.
205     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
206     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
207     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
208     
209     // Finally, delete the vertex.
210     testGraph.deleteObject("123");
211     
212     // Retrieve the next event from the event stream and validate that it is what we expect.
213     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
214     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
215     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
216   }
217   
218   /**
219    * This test validates that performing vertex operations in the case where the data to be
220    * forwarded to the event stream is unavailable results in no event being generated, but
221    * does not otherwise create issues.
222    * 
223    * @throws ChampMarshallingException
224    * @throws ChampSchemaViolationException
225    * @throws ChampObjectNotExistsException
226    * @throws InterruptedException
227    * @throws JsonParseException
228    * @throws JsonMappingException
229    * @throws IOException
230    * @throws ChampTransactionException 
231    */
232   @Test
233   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
234                                                      ChampSchemaViolationException, 
235                                                      ChampObjectNotExistsException, 
236                                                      InterruptedException, 
237                                                      JsonParseException, 
238                                                      JsonMappingException, 
239                                                      IOException, ChampTransactionException {
240             
241     // Setup our test graph to simulate failures to retrieve data from the graph data store.
242     testGraph.returnNulls();
243     
244     // Create a vertex and store it in the graph data store.
245     ChampObject obj1 = ChampObject.create()
246         .ofType("foo")
247         .withKey("123")
248         .withProperty("p1", "v1")
249         .withProperty("p2", "v2")
250         .build();  
251     testGraph.storeObject(obj1, Optional.empty());
252
253     // Check our simulated event stream to verify that an event log was produced.
254     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
255     
256     // Validate that we did not get an event from the stream.
257     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
258     
259     // Create a new vertex based on the one that we already created.
260     ChampObject obj2 = ChampObject.create()
261         .from(obj1)
262         .withKey("123")
263         .withProperty("p3", "v3")
264         .build();
265     
266     // Now, try doing a replace operation.
267     testGraph.replaceObject(obj2, Optional.empty());
268     
269     // Check our simulated event stream to see if an event log was not produced.
270     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
271     
272     // Validate that we did not get an event from the stream.
273     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
274     
275     // Finally, delete the vertex.
276     testGraph.deleteObject("123", Optional.empty());
277     
278     // Check our simulated event stream to see if an event log was not produced.
279     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
280     
281     // Validate that we did not get an event from the stream.
282     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
283   }
284   
285   
286   /**
287    * Validates that store/replace/delete operation against edges result in the expected events
288    * being published to the event stream.
289    *
290    * @throws ChampMarshallingException
291    * @throws ChampSchemaViolationException
292    * @throws ChampObjectNotExistsException
293    * @throws InterruptedException
294    * @throws JsonParseException
295    * @throws JsonMappingException
296    * @throws IOException
297    * @throws ChampUnmarshallingException
298    * @throws ChampRelationshipNotExistsException
299    * @throws ChampTransactionException 
300    */
301   @Test
302   public void edgeOperationsTest() throws ChampMarshallingException, 
303                                           ChampSchemaViolationException, 
304                                           ChampObjectNotExistsException, 
305                                           InterruptedException, 
306                                           JsonParseException, 
307                                           JsonMappingException, 
308                                           IOException, 
309                                           ChampUnmarshallingException, 
310                                           ChampRelationshipNotExistsException, ChampTransactionException {
311     
312     // Create two vertices to act as the end points of our edge.
313     ChampObject obj1 = ChampObject.create()
314         .ofType("foo")
315         .withKey("123")
316         .withProperty("p1", "v1")
317         .withProperty("p2", "v2")
318         .build();  
319
320     ChampObject obj2 = ChampObject.create()
321         .ofType("bar")
322         .withKey("123")
323         .withProperty("p3", "v3")
324         .build();
325     
326     // Now, create an edge object and write it to the graph data store.
327     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
328         .property("property-1", "value-1")
329         .property("property-2", "value-2")
330         .build();
331     testGraph.storeRelationship(rel, Optional.empty());
332     
333     // Retrieve the next event from the event stream and validate that it is what we expect.
334     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
335     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
336     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
337     
338     // Now, create another edge object based on the one we just wrote, and use it to perform
339     // a replace operation.
340     ChampRelationship rel2 = ChampRelationship.create()
341         .from(rel)
342         .withKey("123")
343         .withProperty("property-3", "value-3")
344         .build();
345     testGraph.replaceRelationship(rel2, Optional.empty());
346
347     // Retrieve the next event from the event stream and validate that it is what we expect.
348     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
349     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
350     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
351     
352     // Finally, delete our edge.
353     testGraph.deleteRelationship(rel2, Optional.empty());
354     
355     // Retrieve the next event from the event stream and validate that it is what we expect.
356     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
357     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
358     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
359   }
360   
361   
362   /**
363    * This test validates that performing edge operations in the case where the data to be
364    * forwarded to the event stream is unavailable results in no event being generated, but
365    * does not otherwise create issues.
366    * 
367    * @throws ChampMarshallingException
368    * @throws ChampSchemaViolationException
369    * @throws ChampObjectNotExistsException
370    * @throws InterruptedException
371    * @throws JsonParseException
372    * @throws JsonMappingException
373    * @throws IOException
374    * @throws ChampUnmarshallingException
375    * @throws ChampRelationshipNotExistsException
376    * @throws ChampTransactionException 
377    */
378   @Test
379   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
380                                                    ChampSchemaViolationException, 
381                                                    ChampObjectNotExistsException, 
382                                                    InterruptedException, 
383                                                    JsonParseException, 
384                                                    JsonMappingException, 
385                                                    IOException, 
386                                                    ChampUnmarshallingException, 
387                                                    ChampRelationshipNotExistsException, ChampTransactionException {
388     
389     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
390     // events.
391     testGraph.returnNulls();
392     
393     // Create two vertices to act as the endpoints of our edge.
394     ChampObject obj1 = ChampObject.create()
395         .ofType("foo")
396         .withKey("123")
397         .withProperty("p1", "v1")
398         .withProperty("p2", "v2")
399         .build();  
400
401     ChampObject obj2 = ChampObject.create()
402         .ofType("bar")
403         .withKey("123")
404         .withProperty("p3", "v3")
405         .build();
406     
407     // Now, create an edge object and write it to the graph data store.
408     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
409         .property("property-1", "value-1")
410         .property("property-2", "value-2")
411         .build();
412     testGraph.storeRelationship(rel, Optional.empty());
413     
414     // Check our simulated event stream to see if an event log was produced.
415     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
416     
417     // Validate that we did not get an event from the stream.
418     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
419         
420     // Now, create another edge object based on the one we just wrote, and use it to perform
421     // a replace operation.
422     ChampRelationship rel2 = ChampRelationship.create()
423         .from(rel)
424         .withKey("123")
425         .withProperty("property-3", "value-3")
426         .build();
427     testGraph.replaceRelationship(rel2, Optional.empty());
428     
429     // Check our simulated event stream to see if an event log was produced.
430     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
431     
432     // Validate that we did not get an event from the stream.
433     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);   
434   }
435   
436   
437   /**
438    * Validates that store/replace/delete operation against partitions result in the expected events
439    * being published to the event stream.
440    *
441    * @throws ChampMarshallingException
442    * @throws ChampSchemaViolationException
443    * @throws ChampObjectNotExistsException
444    * @throws InterruptedException
445    * @throws JsonParseException
446    * @throws JsonMappingException
447    * @throws IOException
448    * @throws ChampUnmarshallingException
449    * @throws ChampRelationshipNotExistsException
450    * @throws ChampTransactionException 
451    */
452   @Test
453   public void partitionOperationsTest() throws ChampMarshallingException, 
454                                                ChampSchemaViolationException, 
455                                                ChampObjectNotExistsException, 
456                                                InterruptedException, 
457                                                JsonParseException, 
458                                                JsonMappingException, 
459                                                IOException, 
460                                                ChampUnmarshallingException, 
461                                                ChampRelationshipNotExistsException, ChampTransactionException {
462     
463     // Create the vertices and edge objects that we need to create a partition.
464     ChampObject obj1 = ChampObject.create()
465         .ofType("foo")
466         .withKey("123")
467         .withProperty("p1", "v1")
468         .withProperty("p2", "v2")
469         .build();  
470
471     ChampObject obj2 = ChampObject.create()
472         .ofType("bar")
473         .withKey("123")
474         .withProperty("p3", "v3")
475         .build();
476     
477     // Now, create an edge object and write it to the graph data store.
478     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
479         .property("property-1", "value-1")
480         .property("property-2", "value-2")
481         .build();
482     
483     // Now, create our partition object and store it in the graph.
484     ChampPartition partition = ChampPartition.create()
485         .withObject(obj1)
486         .withObject(obj2)
487         .withRelationship(rel)
488         .build();
489     testGraph.storePartition(partition, Optional.empty());
490     
491     // Retrieve the next event from the event stream and validate that it is what we expect.
492     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
493     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
494     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
495
496     // Now, delete our partition.
497     testGraph.deletePartition(partition, Optional.empty());
498     
499     // Retrieve the next event from the event stream and validate that it is what we expect.
500     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
501     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
502     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
503   }
504   
505   
506   /**
507    * This test validates that performing partition operations in the case where the data to be
508    * forwarded to the event stream is unavailable results in no event being generated, but
509    * does not otherwise create issues.
510    * 
511    * @throws ChampMarshallingException
512    * @throws ChampSchemaViolationException
513    * @throws ChampObjectNotExistsException
514    * @throws InterruptedException
515    * @throws JsonParseException
516    * @throws JsonMappingException
517    * @throws IOException
518    * @throws ChampUnmarshallingException
519    * @throws ChampRelationshipNotExistsException
520    * @throws ChampTransactionException 
521    */
522   @Test
523   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
524                                           ChampSchemaViolationException, 
525                                           ChampObjectNotExistsException, 
526                                           InterruptedException, 
527                                           JsonParseException, 
528                                           JsonMappingException, 
529                                           IOException, 
530                                           ChampUnmarshallingException, 
531                                           ChampRelationshipNotExistsException, ChampTransactionException {
532     
533     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
534     // events.
535     testGraph.returnNulls();
536     
537     // Create all of the objects we need to create a partition, and store the partition
538     // in the graph.
539     ChampObject obj1 = ChampObject.create()
540         .ofType("foo")
541         .withKey("123")
542         .withProperty("p1", "v1")
543         .withProperty("p2", "v2")
544         .build();  
545
546     ChampObject obj2 = ChampObject.create()
547         .ofType("bar")
548         .withKey("123")
549         .withProperty("p3", "v3")
550         .build();
551     
552     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
553         .property("property-1", "value-1")
554         .property("property-2", "value-2")
555         .build();
556     
557     ChampPartition partition = ChampPartition.create()
558         .withObject(obj1)
559         .withObject(obj2)
560         .withRelationship(rel)
561         .build();
562     testGraph.storePartition(partition, Optional.empty());
563     
564     // Check our simulated event stream to see if an an event log was produced.
565     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
566     
567     // Validate that we did not get an event from the stream.
568     assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
569   }
570   
571   
572   /**
573    * Validates that store/replace/delete operation against vertex indexes result in the expected
574    * events being published to the event stream.
575    * 
576    * @throws ChampMarshallingException
577    * @throws ChampSchemaViolationException
578    * @throws ChampObjectNotExistsException
579    * @throws InterruptedException
580    * @throws JsonParseException
581    * @throws JsonMappingException
582    * @throws IOException
583    * @throws ChampUnmarshallingException
584    * @throws ChampRelationshipNotExistsException
585    * @throws ChampIndexNotExistsException
586    */
587   @Test
588   public void indexOperationsTest() throws ChampMarshallingException, 
589                                            ChampSchemaViolationException, 
590                                            ChampObjectNotExistsException, 
591                                            InterruptedException, 
592                                            JsonParseException, 
593                                            JsonMappingException, 
594                                            IOException, 
595                                            ChampUnmarshallingException, 
596                                            ChampRelationshipNotExistsException, 
597                                            ChampIndexNotExistsException {
598         
599     // Create an index object and store it in the graph.
600     ChampObjectIndex objIndex = ChampObjectIndex.create()
601         .ofName("myIndex")
602         .onType("type")
603         .forField("myField")
604         .build();
605     testGraph.storeObjectIndex(objIndex);
606     
607     // Retrieve the next event from the event stream and validate that it is what we expect.
608     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
609     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
610     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
611     
612     // Now, delete our partition.
613     testGraph.deleteObjectIndex("myIndex");
614     
615     // Retrieve the next event from the event stream and validate that it is what we expect.
616     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
617     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
618     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
619   }
620   
621   /**
622    * This test validates that performing index operations in the case where the data to be
623    * forwarded to the event stream is unavailable results in no event being generated, but
624    * does not otherwise create issues.
625    * 
626    * @throws ChampMarshallingException
627    * @throws ChampSchemaViolationException
628    * @throws ChampObjectNotExistsException
629    * @throws InterruptedException
630    * @throws JsonParseException
631    * @throws JsonMappingException
632    * @throws IOException
633    * @throws ChampUnmarshallingException
634    * @throws ChampRelationshipNotExistsException
635    * @throws ChampIndexNotExistsException
636    */
637   @Test
638   public void indexOperationsWithNullsTest() throws ChampMarshallingException, 
639                                                     ChampSchemaViolationException, 
640                                                     ChampObjectNotExistsException, 
641                                                     InterruptedException, 
642                                                     JsonParseException, 
643                                                     JsonMappingException, 
644                                                     IOException, 
645                                                     ChampUnmarshallingException, 
646                                                     ChampRelationshipNotExistsException, 
647                                                     ChampIndexNotExistsException {
648     
649     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
650     // events.
651     testGraph.returnNulls();
652     
653     // Create an index object and store it in the graph.
654     ChampObjectIndex objIndex = ChampObjectIndex.create()
655         .ofName("myIndex")
656         .onType("type")
657         .forField("myField")
658         .build();
659     testGraph.storeObjectIndex(objIndex);
660     
661     // Check our simulated event stream to see if an  an event log was produced.
662     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
663     
664     // Now, delete our index.
665     testGraph.deleteObjectIndex("myIndex");
666     
667     // Check our simulated event stream to see if an an event log was produced.
668     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
669     
670     // Validate that we did not get an event from the stream.
671     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
672   }
673   
674   
675   /**
676    * This test validates that performing relationship index operations in the case where 
677    * the data to be forwarded to the event stream is unavailable results in no event being 
678    * generated, but does not otherwise create issues.
679    * 
680    * @throws ChampMarshallingException
681    * @throws ChampSchemaViolationException
682    * @throws ChampObjectNotExistsException
683    * @throws InterruptedException
684    * @throws JsonParseException
685    * @throws JsonMappingException
686    * @throws IOException
687    * @throws ChampUnmarshallingException
688    * @throws ChampRelationshipNotExistsException
689    * @throws ChampIndexNotExistsException
690    */
691   @Test
692   public void relationshipIndexOperationsTest() throws ChampMarshallingException, 
693                                                        ChampSchemaViolationException, 
694                                                        ChampObjectNotExistsException, 
695                                                        InterruptedException, 
696                                                        JsonParseException, 
697                                                        JsonMappingException, 
698                                                        IOException, 
699                                                        ChampUnmarshallingException, 
700                                                        ChampRelationshipNotExistsException, 
701                                                        ChampIndexNotExistsException {
702         
703     // Create a relationship index object and store it in the graph.
704     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
705         .ofName("myIndex")
706         .onType("type")
707         .forField("myField")
708         .build();
709     testGraph.storeRelationshipIndex(relIndex);
710     
711     // Retrieve the next event from the event stream and validate that it is what we expect.
712     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
713     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
714     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
715     
716     // Now, delete our partition.
717     testGraph.deleteRelationshipIndex("myIndex");
718     
719     // Retrieve the next event from the event stream and validate that it is what we expect.
720     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
721     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
722     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
723   }
724   
725   
726   /**
727    * This test validates that performing index operations in the case where the data to be
728    * forwarded to the event stream is unavailable results in no event being generated, but
729    * does not otherwise create issues.
730    * 
731    * @throws ChampMarshallingException
732    * @throws ChampSchemaViolationException
733    * @throws ChampObjectNotExistsException
734    * @throws InterruptedException
735    * @throws JsonParseException
736    * @throws JsonMappingException
737    * @throws IOException
738    * @throws ChampUnmarshallingException
739    * @throws ChampRelationshipNotExistsException
740    * @throws ChampIndexNotExistsException
741    */
742   @Test
743   public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException, 
744                                                                 ChampSchemaViolationException, 
745                                                                 ChampObjectNotExistsException, 
746                                                                 InterruptedException, 
747                                                                 JsonParseException, 
748                                                                 JsonMappingException, 
749                                                                 IOException, 
750                                                                 ChampUnmarshallingException, 
751                                                                 ChampRelationshipNotExistsException, 
752                                                                 ChampIndexNotExistsException {
753     
754     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
755     // events.
756     testGraph.returnNulls();
757     
758     // Create a relationship index object and store it in the graph.
759     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
760         .ofName("myIndex")
761         .onType("type")
762         .forField("myField")
763         .build();
764     
765     testGraph.storeRelationshipIndex(relIndex);
766     
767     // Check our simulated event stream to see if an an event log was produced.
768     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
769     
770     // Now, delete our index.
771     testGraph.deleteRelationshipIndex("myIndex");
772     
773     // Check our simulated event stream to see if an event log was produced.
774     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
775     
776     // Validate that we did not get an event from the stream.
777     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
778   }
779       
780   
781   /**
782    * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which 
783    * we can use to validate that log events get generated without worrying about having a real
784    * underlying graph.
785    */
786   private class TestGraph extends AbstractLoggingChampGraph {
787     
788     /** If set, this causes simulated retrieve operations to fail. */
789     private boolean returnNulls = false;
790     
791     
792     protected TestGraph(Map<String, Object> properties) {
793       super(properties);      
794     }
795
796     public void returnNulls() {
797       returnNulls = true;
798     }
799     
800     @Override 
801     public void shutdown() {
802       if(returnNulls) {
803         publisherPool = null;
804       }
805       super.shutdown();
806     }
807     
808     @Override
809     public ChampObject executeStoreObject(ChampObject object, Optional<ChampTransaction> transaction) 
810         throws ChampMarshallingException,
811                ChampSchemaViolationException, 
812                ChampObjectNotExistsException {
813       
814       if(!returnNulls) {
815         return object;
816       } else {
817         return null;
818       }
819     }
820
821     @Override
822     public ChampObject executeReplaceObject(ChampObject object, Optional<ChampTransaction> transaction) 
823         throws ChampMarshallingException,
824                ChampSchemaViolationException, 
825                ChampObjectNotExistsException {
826       
827       if(!returnNulls) {
828         return object;
829       } else {
830         return null;
831       }
832     }
833
834     @Override
835     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
836       return retrieveObject(key, Optional.empty());
837     }
838     
839     @Override
840     public Optional<ChampObject> retrieveObject(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
841       
842       if(!returnNulls) {
843         return(Optional.of(ChampObject.create()
844                             .ofType("foo")
845                             .withKey(key)
846                             .build()));  
847       } else {
848         return Optional.empty();
849       }
850     }
851
852     @Override
853     public void executeDeleteObject(Object key, Optional<ChampTransaction> transaction) throws ChampObjectNotExistsException {
854    
855     }
856
857     @Override
858     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
859       return queryObjects(queryParams, Optional.empty());
860     }
861
862    
863     @Override
864     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
865       // Not used by any tests.
866       return null;
867     }
868
869     @Override
870     public ChampRelationship executeStoreRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) 
871         throws ChampUnmarshallingException, 
872                ChampMarshallingException, 
873                ChampObjectNotExistsException, 
874                ChampSchemaViolationException,
875                ChampRelationshipNotExistsException {
876
877       if(!returnNulls) {
878         return relationship;
879       } else {
880         return null;
881       }
882     }
883
884     @Override
885     public ChampRelationship executeReplaceRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction)
886         throws ChampUnmarshallingException, 
887                ChampMarshallingException,
888                ChampSchemaViolationException, 
889                ChampRelationshipNotExistsException {
890
891       if(!returnNulls) {
892         return relationship;
893       } else {
894         return null;
895       }
896     }
897
898     @Override
899     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
900       return retrieveRelationship(key, Optional.empty());
901     }
902     
903     @Override
904     public Optional<ChampRelationship> retrieveRelationship(Object key, Optional<ChampTransaction> transaction) throws ChampUnmarshallingException {
905       // Not used by any tests.
906       return null;
907     }
908
909     @Override
910     public void executeDeleteRelationship(ChampRelationship relationship, Optional<ChampTransaction> transaction) throws ChampRelationshipNotExistsException {
911       // Not used by any tests.   
912     }
913
914     @Override
915     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
916         throws ChampUnmarshallingException, ChampObjectNotExistsException {
917       return retrieveRelationships(object, Optional.empty());
918     }
919     
920     @Override
921     public Stream<ChampRelationship> retrieveRelationships(ChampObject object, Optional<ChampTransaction> transaction)
922         throws ChampUnmarshallingException, ChampObjectNotExistsException {
923       
924       // Not used by any tests.
925       return null;
926     }
927
928     @Override
929     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
930       return queryRelationships(queryParams, Optional.empty());
931     }
932     
933     @Override
934     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams, Optional<ChampTransaction> transaction) {
935       
936       // Not used by any tests.
937       return null;
938     }
939
940     @Override
941     public ChampPartition executeStorePartition(ChampPartition partition, Optional<ChampTransaction> transaction) 
942         throws ChampSchemaViolationException, 
943                ChampRelationshipNotExistsException,
944                ChampMarshallingException, 
945                ChampObjectNotExistsException {
946
947       if(!returnNulls) {
948         return partition;
949       } else {
950         return null;
951       }
952     }
953
954     @Override
955     public void executeDeletePartition(ChampPartition graph, Optional<ChampTransaction> transaction) {
956       // Not used by any tests.     
957     }
958
959     @Override
960     public void executeStoreObjectIndex(ChampObjectIndex index) {
961       // Not used by any tests.    
962     }
963
964     @Override
965     public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
966       
967       if(!returnNulls) {
968         return Optional.of(ChampObjectIndex.create()
969                             .ofName(indexName)
970                             .onType("doesnt matter")
971                             .forField("doesnt matter")
972                             .build());
973       } else {
974         return Optional.empty();
975       }
976     }
977
978     @Override
979     public Stream<ChampObjectIndex> retrieveObjectIndices() {
980       // Not used by any tests.
981       return null;
982     }
983
984     @Override
985     public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
986       // Not used by any tests.    
987     }
988
989     @Override
990     public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
991       // Not used by any tests.  
992     }
993
994     @Override
995     public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
996       if(!returnNulls) {
997         return Optional.of(ChampRelationshipIndex.create()
998                             .ofName(indexName)
999                             .onType("doesnt matter")
1000                             .forField("doesnt matter")
1001                             .build());
1002       } else {
1003         return Optional.empty();
1004       }
1005     }
1006
1007     @Override
1008     public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
1009       // Not used by any tests.
1010       return null;
1011     }
1012
1013     @Override
1014     public void executeDeleteRelationshipIndex(String indexName)
1015         throws ChampIndexNotExistsException {
1016       // Not used by any tests.    
1017     }
1018
1019     @Override
1020     public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
1021       // Not used by any tests.    
1022     }
1023
1024     @Override
1025     public ChampSchema retrieveSchema() {
1026       // Not used by any tests.
1027       return null;
1028     }
1029
1030     @Override
1031     public void updateSchema(ChampObjectConstraint objectConstraint)
1032         throws ChampSchemaViolationException {
1033       // Not used by any tests.    
1034     }
1035
1036     @Override
1037     public void updateSchema(ChampRelationshipConstraint schema)
1038         throws ChampSchemaViolationException {
1039       // Not used by any tests.     
1040     }
1041
1042     @Override
1043     public void deleteSchema() {
1044       // Not used by any tests.  
1045     }
1046
1047     @Override
1048     public ChampCapabilities capabilities() {
1049       // Not used by any tests.
1050       return null;
1051     }
1052
1053     @Override
1054     public ChampTransaction openTransaction() {
1055       // Not used by any tests.
1056       return null;
1057     }
1058
1059     @Override
1060     public void commitTransaction(ChampTransaction transaction) {
1061       // Not used by any tests.
1062       
1063     }
1064
1065     @Override
1066     public void rollbackTransaction(ChampTransaction transaction) {
1067       // Not used by any tests.    
1068     }
1069   }
1070   
1071   private class InMemoryPublisher implements EventPublisher {
1072
1073     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
1074     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
1075     private boolean failMode=false;
1076     
1077     
1078     public void enterFailMode() {
1079       failMode=true;
1080     }
1081     
1082     @Override
1083     public int sendSync(String partitionKey, String message) throws Exception {
1084       
1085       if(!failMode) {
1086         eventStream.add(message);
1087         return 0;
1088       } else {
1089         failedMsgs.add(message);
1090         throw new IOException("nope");
1091       }
1092     }
1093     
1094     @Override
1095     public int sendSync(String partitionKey, Collection<String> messages) throws Exception {
1096       
1097       for(String msg : messages) {
1098         if(!failMode) {
1099           eventStream.add(msg);
1100           return 0;
1101         } else {
1102           failedMsgs.add(msg);
1103           throw new IOException("nope");
1104         }
1105       }
1106       return 0;
1107     }
1108     
1109     @Override
1110     public int sendSync(String message) throws Exception {
1111       if(!failMode) {
1112         eventStream.add(message);
1113         return 0;
1114       } else {
1115         failedMsgs.add(message);
1116         throw new IOException("nope");
1117       }
1118     }
1119     
1120     @Override
1121     public int sendSync(Collection<String> messages) throws Exception {
1122       
1123       for(String msg : messages) {
1124         if(!failMode) {
1125           eventStream.add(msg);
1126           return 0;
1127         } else {
1128           failedMsgs.add(msg);
1129           throw new IOException("nope");
1130         }
1131       }
1132       return 0;
1133     }
1134     @Override
1135     public void sendAsync(String partitionKey, String message) throws Exception {
1136       if(!failMode) {
1137         eventStream.add(message);
1138       } else {
1139         failedMsgs.add(message);
1140         throw new IOException("nope");
1141       }      
1142     }
1143     @Override
1144     public void sendAsync(String partitionKey, Collection<String> messages) throws Exception {
1145       for(String msg : messages) {
1146         if(!failMode) {
1147           eventStream.add(msg);
1148         } else {
1149           failedMsgs.add(msg);
1150           throw new IOException("nope");
1151         }
1152       }     
1153     }
1154     @Override
1155     public void sendAsync(String message) throws Exception {
1156       if(!failMode) {
1157         eventStream.add(message);
1158       } else {
1159         failedMsgs.add(message);
1160         throw new IOException("nope");
1161       }      
1162     }
1163     @Override
1164     public void sendAsync(Collection<String> messages) throws Exception {
1165       for(String msg : messages) {
1166         if(!failMode) {
1167           eventStream.add(msg);
1168         } else {
1169           failedMsgs.add(msg);
1170           throw new IOException("nope");
1171         }
1172       }    
1173     }
1174     
1175     @Override
1176     public void close() throws Exception {
1177       // TODO Auto-generated method stub
1178       
1179     }    
1180   }
1181 }