Renaming openecomp to onap
[aai/champ.git] / src / test / java / org / onap / aai / champ / event / AbstractLoggingChampGraphTest.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.event;
23
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Stream;
36
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.onap.aai.champ.ChampCapabilities;
41 import org.onap.aai.champ.event.AbstractLoggingChampGraph;
42 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
43 import org.onap.aai.champ.exceptions.ChampMarshallingException;
44 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
45 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
46 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
47 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
48 import org.onap.aai.champ.model.ChampObject;
49 import org.onap.aai.champ.model.ChampObjectConstraint;
50 import org.onap.aai.champ.model.ChampObjectIndex;
51 import org.onap.aai.champ.model.ChampPartition;
52 import org.onap.aai.champ.model.ChampRelationship;
53 import org.onap.aai.champ.model.ChampRelationshipConstraint;
54 import org.onap.aai.champ.model.ChampRelationshipIndex;
55 import org.onap.aai.champ.model.ChampSchema;
56 import org.slf4j.Logger;
57
58 import com.att.nsa.cambria.client.CambriaPublisher;
59 import com.fasterxml.jackson.core.JsonParseException;
60 import com.fasterxml.jackson.databind.JsonMappingException;
61
62
63 public class AbstractLoggingChampGraphTest {
64
65   /** Event stream producer stub. */
66   private InMemoryPublisher producer;
67   
68   /** In memory graph for testing purposes. */
69   private TestGraph testGraph;
70   
71   
72   /**
73    * Perform any setup tasks that need to be done prior to each test.
74    */
75   @Before
76   public void setup() {
77     
78     // Instantiate an event stream producer stub to use in our tests.
79     producer = new InMemoryPublisher();
80     
81     // Instantiate an 'in-memory' graph for test purposes.
82     Map<String, Object> graphProperties = new HashMap<String, Object>();
83     graphProperties.put("champ.event.stream.hosts", "myeventstreamhost");
84     graphProperties.put("champ.event.stream.batch-size", 1);
85     testGraph = new TestGraph(graphProperties, producer);
86   }
87   
88   
89   /**
90    * Perform any cleanup that needs to be done after each test.
91    */
92   @After
93   public void tearDown() {
94     
95     // Close our stubbed producer and graph.
96     producer.close();
97     testGraph.shutdown();
98   }
99   
100  
101   /**
102    * Validates that store/replace/delete operation against vertices result in the expected events
103    * being published to the event stream.
104    * 
105    * @throws ChampMarshallingException
106    * @throws ChampSchemaViolationException
107    * @throws ChampObjectNotExistsException
108    * @throws InterruptedException
109    * @throws JsonParseException
110    * @throws JsonMappingException
111    * @throws IOException
112    */
113   @Test
114   public void vertexOperationsTest() throws ChampMarshallingException, 
115                                             ChampSchemaViolationException, 
116                                             ChampObjectNotExistsException, 
117                                             InterruptedException, 
118                                             JsonParseException, 
119                                             JsonMappingException, 
120                                             IOException {
121             
122     // Create a vertex and store it in the graph data store.
123     ChampObject obj1 = ChampObject.create()
124         .ofType("foo")
125         .withKey("123")
126         .withProperty("p1", "v1")
127         .withProperty("p2", "v2")
128         .build();  
129     testGraph.storeObject(obj1);
130
131     // Retrieve the next event from the event stream and validate that it is what we expect.
132     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
133     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
134     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
135   
136     // Create a new vertex based on the one that we already created.
137     ChampObject obj2 = ChampObject.create()
138         .from(obj1)
139         .withKey("123")
140         .withProperty("p3", "v3")
141         .build();
142     
143     // Now, try doing a replace operation.
144     testGraph.replaceObject(obj2);
145     
146     // Retrieve the next event from the event stream and validate that it is what we expect.
147     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
148     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
149     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
150     
151     // Finally, delete the vertex.
152     testGraph.deleteObject("123");
153     
154     // Retrieve the next event from the event stream and validate that it is what we expect.
155     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
156     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
157     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
158   }
159   
160   
161   /**
162    * This test validates that performing vertex operations in the case where the data to be
163    * forwarded to the event stream is unavailable results in no event being generated, but
164    * does not otherwise create issues.
165    * 
166    * @throws ChampMarshallingException
167    * @throws ChampSchemaViolationException
168    * @throws ChampObjectNotExistsException
169    * @throws InterruptedException
170    * @throws JsonParseException
171    * @throws JsonMappingException
172    * @throws IOException
173    */
174   @Test
175   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
176                                                      ChampSchemaViolationException, 
177                                                      ChampObjectNotExistsException, 
178                                                      InterruptedException, 
179                                                      JsonParseException, 
180                                                      JsonMappingException, 
181                                                      IOException {
182             
183     // Setup our test graph to simulate failures to retrieve data from the graph data store.
184     testGraph.returnNulls();
185     
186     // Create a vertex and store it in the graph data store.
187     ChampObject obj1 = ChampObject.create()
188         .ofType("foo")
189         .withKey("123")
190         .withProperty("p1", "v1")
191         .withProperty("p2", "v2")
192         .build();  
193     testGraph.storeObject(obj1);
194
195     // Check our simulated event stream to verify that an event log was produced.
196     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
197     
198     // Validate that we did not get an event from the stream.
199     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
200     
201     // Create a new vertex based on the one that we already created.
202     ChampObject obj2 = ChampObject.create()
203         .from(obj1)
204         .withKey("123")
205         .withProperty("p3", "v3")
206         .build();
207     
208     // Now, try doing a replace operation.
209     testGraph.replaceObject(obj2);
210     
211     // Check our simulated event stream to see if an event log was not produced.
212     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
213     
214     // Validate that we did not get an event from the stream.
215     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
216     
217     // Finally, delete the vertex.
218     testGraph.deleteObject("123");
219     
220     // Check our simulated event stream to see if an event log was not produced.
221     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
222     
223     // Validate that we did not get an event from the stream.
224     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
225   }
226   
227   
228   /**
229    * Validates that store/replace/delete operation against edges result in the expected events
230    * being published to the event stream.
231    *
232    * @throws ChampMarshallingException
233    * @throws ChampSchemaViolationException
234    * @throws ChampObjectNotExistsException
235    * @throws InterruptedException
236    * @throws JsonParseException
237    * @throws JsonMappingException
238    * @throws IOException
239    * @throws ChampUnmarshallingException
240    * @throws ChampRelationshipNotExistsException
241    */
242   @Test
243   public void edgeOperationsTest() throws ChampMarshallingException, 
244                                           ChampSchemaViolationException, 
245                                           ChampObjectNotExistsException, 
246                                           InterruptedException, 
247                                           JsonParseException, 
248                                           JsonMappingException, 
249                                           IOException, 
250                                           ChampUnmarshallingException, 
251                                           ChampRelationshipNotExistsException {
252     
253     // Create two vertices to act as the end points of our edge.
254     ChampObject obj1 = ChampObject.create()
255         .ofType("foo")
256         .withKey("123")
257         .withProperty("p1", "v1")
258         .withProperty("p2", "v2")
259         .build();  
260
261     ChampObject obj2 = ChampObject.create()
262         .ofType("bar")
263         .withKey("123")
264         .withProperty("p3", "v3")
265         .build();
266     
267     // Now, create an edge object and write it to the graph data store.
268     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
269         .property("property-1", "value-1")
270         .property("property-2", "value-2")
271         .build();
272     testGraph.storeRelationship(rel);
273     
274     // Retrieve the next event from the event stream and validate that it is what we expect.
275     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
276     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
277     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
278     
279     // Now, create another edge object based on the one we just wrote, and use it to perform
280     // a replace operation.
281     ChampRelationship rel2 = ChampRelationship.create()
282         .from(rel)
283         .withKey("123")
284         .withProperty("property-3", "value-3")
285         .build();
286     testGraph.replaceRelationship(rel2);
287
288     // Retrieve the next event from the event stream and validate that it is what we expect.
289     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
290     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
291     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
292     
293     // Finally, delete our edge.
294     testGraph.deleteRelationship(rel2);
295     
296     // Retrieve the next event from the event stream and validate that it is what we expect.
297     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
298     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
299     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
300   }
301   
302   
303   /**
304    * This test validates that performing edge operations in the case where the data to be
305    * forwarded to the event stream is unavailable results in no event being generated, but
306    * does not otherwise create issues.
307    * 
308    * @throws ChampMarshallingException
309    * @throws ChampSchemaViolationException
310    * @throws ChampObjectNotExistsException
311    * @throws InterruptedException
312    * @throws JsonParseException
313    * @throws JsonMappingException
314    * @throws IOException
315    * @throws ChampUnmarshallingException
316    * @throws ChampRelationshipNotExistsException
317    */
318   @Test
319   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
320                                                    ChampSchemaViolationException, 
321                                                    ChampObjectNotExistsException, 
322                                                    InterruptedException, 
323                                                    JsonParseException, 
324                                                    JsonMappingException, 
325                                                    IOException, 
326                                                    ChampUnmarshallingException, 
327                                                    ChampRelationshipNotExistsException {
328     
329     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
330     // events.
331     testGraph.returnNulls();
332     
333     // Create two vertices to act as the endpoints of our edge.
334     ChampObject obj1 = ChampObject.create()
335         .ofType("foo")
336         .withKey("123")
337         .withProperty("p1", "v1")
338         .withProperty("p2", "v2")
339         .build();  
340
341     ChampObject obj2 = ChampObject.create()
342         .ofType("bar")
343         .withKey("123")
344         .withProperty("p3", "v3")
345         .build();
346     
347     // Now, create an edge object and write it to the graph data store.
348     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
349         .property("property-1", "value-1")
350         .property("property-2", "value-2")
351         .build();
352     testGraph.storeRelationship(rel);
353     
354     // Check our simulated event stream to see if an event log was produced.
355     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
356     
357     // Validate that we did not get an event from the stream.
358     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
359         
360     // Now, create another edge object based on the one we just wrote, and use it to perform
361     // a replace operation.
362     ChampRelationship rel2 = ChampRelationship.create()
363         .from(rel)
364         .withKey("123")
365         .withProperty("property-3", "value-3")
366         .build();
367     testGraph.replaceRelationship(rel2);
368     
369     // Check our simulated event stream to see if an event log was produced.
370     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
371     
372     // Validate that we did not get an event from the stream.
373     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);   
374   }
375   
376   
377   /**
378    * Validates that store/replace/delete operation against partitions result in the expected events
379    * being published to the event stream.
380    *
381    * @throws ChampMarshallingException
382    * @throws ChampSchemaViolationException
383    * @throws ChampObjectNotExistsException
384    * @throws InterruptedException
385    * @throws JsonParseException
386    * @throws JsonMappingException
387    * @throws IOException
388    * @throws ChampUnmarshallingException
389    * @throws ChampRelationshipNotExistsException
390    */
391   @Test
392   public void partitionOperationsTest() throws ChampMarshallingException, 
393                                                ChampSchemaViolationException, 
394                                                ChampObjectNotExistsException, 
395                                                InterruptedException, 
396                                                JsonParseException, 
397                                                JsonMappingException, 
398                                                IOException, 
399                                                ChampUnmarshallingException, 
400                                                ChampRelationshipNotExistsException {
401     
402     // Create the vertices and edge objects that we need to create a partition.
403     ChampObject obj1 = ChampObject.create()
404         .ofType("foo")
405         .withKey("123")
406         .withProperty("p1", "v1")
407         .withProperty("p2", "v2")
408         .build();  
409
410     ChampObject obj2 = ChampObject.create()
411         .ofType("bar")
412         .withKey("123")
413         .withProperty("p3", "v3")
414         .build();
415     
416     // Now, create an edge object and write it to the graph data store.
417     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
418         .property("property-1", "value-1")
419         .property("property-2", "value-2")
420         .build();
421     
422     // Now, create our partition object and store it in the graph.
423     ChampPartition partition = ChampPartition.create()
424         .withObject(obj1)
425         .withObject(obj2)
426         .withRelationship(rel)
427         .build();
428     testGraph.storePartition(partition);
429     
430     // Retrieve the next event from the event stream and validate that it is what we expect.
431     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
432     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
433     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
434
435     // Now, delete our partition.
436     testGraph.deletePartition(partition);
437     
438     // Retrieve the next event from the event stream and validate that it is what we expect.
439     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
440     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
441     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
442   }
443   
444   
445   /**
446    * This test validates that performing partition operations in the case where the data to be
447    * forwarded to the event stream is unavailable results in no event being generated, but
448    * does not otherwise create issues.
449    * 
450    * @throws ChampMarshallingException
451    * @throws ChampSchemaViolationException
452    * @throws ChampObjectNotExistsException
453    * @throws InterruptedException
454    * @throws JsonParseException
455    * @throws JsonMappingException
456    * @throws IOException
457    * @throws ChampUnmarshallingException
458    * @throws ChampRelationshipNotExistsException
459    */
460   @Test
461   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
462                                           ChampSchemaViolationException, 
463                                           ChampObjectNotExistsException, 
464                                           InterruptedException, 
465                                           JsonParseException, 
466                                           JsonMappingException, 
467                                           IOException, 
468                                           ChampUnmarshallingException, 
469                                           ChampRelationshipNotExistsException {
470     
471     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
472     // events.
473     testGraph.returnNulls();
474     
475     // Create all of the objects we need to create a partition, and store the partition
476     // in the graph.
477     ChampObject obj1 = ChampObject.create()
478         .ofType("foo")
479         .withKey("123")
480         .withProperty("p1", "v1")
481         .withProperty("p2", "v2")
482         .build();  
483
484     ChampObject obj2 = ChampObject.create()
485         .ofType("bar")
486         .withKey("123")
487         .withProperty("p3", "v3")
488         .build();
489     
490     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
491         .property("property-1", "value-1")
492         .property("property-2", "value-2")
493         .build();
494     
495     ChampPartition partition = ChampPartition.create()
496         .withObject(obj1)
497         .withObject(obj2)
498         .withRelationship(rel)
499         .build();
500     testGraph.storePartition(partition);
501     
502     // Check our simulated event stream to see if an an event log was produced.
503     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
504     
505     // Validate that we did not get an event from the stream.
506     assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
507   }
508   
509   
510   /**
511    * Validates that store/replace/delete operation against vertex indexes result in the expected
512    * events being published to the event stream.
513    * 
514    * @throws ChampMarshallingException
515    * @throws ChampSchemaViolationException
516    * @throws ChampObjectNotExistsException
517    * @throws InterruptedException
518    * @throws JsonParseException
519    * @throws JsonMappingException
520    * @throws IOException
521    * @throws ChampUnmarshallingException
522    * @throws ChampRelationshipNotExistsException
523    * @throws ChampIndexNotExistsException
524    */
525   @Test
526   public void indexOperationsTest() throws ChampMarshallingException, 
527                                            ChampSchemaViolationException, 
528                                            ChampObjectNotExistsException, 
529                                            InterruptedException, 
530                                            JsonParseException, 
531                                            JsonMappingException, 
532                                            IOException, 
533                                            ChampUnmarshallingException, 
534                                            ChampRelationshipNotExistsException, 
535                                            ChampIndexNotExistsException {
536         
537     // Create an index object and store it in the graph.
538     ChampObjectIndex objIndex = ChampObjectIndex.create()
539         .ofName("myIndex")
540         .onType("type")
541         .forField("myField")
542         .build();
543     testGraph.storeObjectIndex(objIndex);
544     
545     // Retrieve the next event from the event stream and validate that it is what we expect.
546     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
547     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
548     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
549     
550     // Now, delete our partition.
551     testGraph.deleteObjectIndex("myIndex");
552     
553     // Retrieve the next event from the event stream and validate that it is what we expect.
554     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
555     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
556     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
557   }
558   
559   /**
560    * This test validates that performing index operations in the case where the data to be
561    * forwarded to the event stream is unavailable results in no event being generated, but
562    * does not otherwise create issues.
563    * 
564    * @throws ChampMarshallingException
565    * @throws ChampSchemaViolationException
566    * @throws ChampObjectNotExistsException
567    * @throws InterruptedException
568    * @throws JsonParseException
569    * @throws JsonMappingException
570    * @throws IOException
571    * @throws ChampUnmarshallingException
572    * @throws ChampRelationshipNotExistsException
573    * @throws ChampIndexNotExistsException
574    */
575   @Test
576   public void indexOperationsWithNullsTest() throws ChampMarshallingException, 
577                                                     ChampSchemaViolationException, 
578                                                     ChampObjectNotExistsException, 
579                                                     InterruptedException, 
580                                                     JsonParseException, 
581                                                     JsonMappingException, 
582                                                     IOException, 
583                                                     ChampUnmarshallingException, 
584                                                     ChampRelationshipNotExistsException, 
585                                                     ChampIndexNotExistsException {
586     
587     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
588     // events.
589     testGraph.returnNulls();
590     
591     // Create an index object and store it in the graph.
592     ChampObjectIndex objIndex = ChampObjectIndex.create()
593         .ofName("myIndex")
594         .onType("type")
595         .forField("myField")
596         .build();
597     testGraph.storeObjectIndex(objIndex);
598     
599     // Check our simulated event stream to see if an  an event log was produced.
600     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
601     
602     // Now, delete our index.
603     testGraph.deleteObjectIndex("myIndex");
604     
605     // Check our simulated event stream to see if an an event log was produced.
606     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
607     
608     // Validate that we did not get an event from the stream.
609     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
610   }
611   
612   
613   /**
614    * This test validates that performing relationship index operations in the case where 
615    * the data to be forwarded to the event stream is unavailable results in no event being 
616    * generated, but does not otherwise create issues.
617    * 
618    * @throws ChampMarshallingException
619    * @throws ChampSchemaViolationException
620    * @throws ChampObjectNotExistsException
621    * @throws InterruptedException
622    * @throws JsonParseException
623    * @throws JsonMappingException
624    * @throws IOException
625    * @throws ChampUnmarshallingException
626    * @throws ChampRelationshipNotExistsException
627    * @throws ChampIndexNotExistsException
628    */
629   @Test
630   public void relationshipIndexOperationsTest() throws ChampMarshallingException, 
631                                                        ChampSchemaViolationException, 
632                                                        ChampObjectNotExistsException, 
633                                                        InterruptedException, 
634                                                        JsonParseException, 
635                                                        JsonMappingException, 
636                                                        IOException, 
637                                                        ChampUnmarshallingException, 
638                                                        ChampRelationshipNotExistsException, 
639                                                        ChampIndexNotExistsException {
640         
641     // Create a relationship index object and store it in the graph.
642     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
643         .ofName("myIndex")
644         .onType("type")
645         .forField("myField")
646         .build();
647     testGraph.storeRelationshipIndex(relIndex);
648     
649     // Retrieve the next event from the event stream and validate that it is what we expect.
650     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
651     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
652     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
653     
654     // Now, delete our partition.
655     testGraph.deleteRelationshipIndex("myIndex");
656     
657     // Retrieve the next event from the event stream and validate that it is what we expect.
658     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
659     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
660     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
661   }
662   
663   
664   /**
665    * This test validates that performing index operations in the case where the data to be
666    * forwarded to the event stream is unavailable results in no event being generated, but
667    * does not otherwise create issues.
668    * 
669    * @throws ChampMarshallingException
670    * @throws ChampSchemaViolationException
671    * @throws ChampObjectNotExistsException
672    * @throws InterruptedException
673    * @throws JsonParseException
674    * @throws JsonMappingException
675    * @throws IOException
676    * @throws ChampUnmarshallingException
677    * @throws ChampRelationshipNotExistsException
678    * @throws ChampIndexNotExistsException
679    */
680   @Test
681   public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException, 
682                                                                 ChampSchemaViolationException, 
683                                                                 ChampObjectNotExistsException, 
684                                                                 InterruptedException, 
685                                                                 JsonParseException, 
686                                                                 JsonMappingException, 
687                                                                 IOException, 
688                                                                 ChampUnmarshallingException, 
689                                                                 ChampRelationshipNotExistsException, 
690                                                                 ChampIndexNotExistsException {
691     
692     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
693     // events.
694     testGraph.returnNulls();
695     
696     // Create a relationship index object and store it in the graph.
697     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
698         .ofName("myIndex")
699         .onType("type")
700         .forField("myField")
701         .build();
702     
703     testGraph.storeRelationshipIndex(relIndex);
704     
705     // Check our simulated event stream to see if an an event log was produced.
706     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
707     
708     // Now, delete our index.
709     testGraph.deleteRelationshipIndex("myIndex");
710     
711     // Check our simulated event stream to see if an event log was produced.
712     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
713     
714     // Validate that we did not get an event from the stream.
715     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
716   }
717       
718   
719   /**
720    * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which 
721    * we can use to validate that log events get generated without worrying about having a real
722    * underlying graph.
723    */
724   private class TestGraph extends AbstractLoggingChampGraph {
725     
726     /** If set, this causes simulated retrieve operations to fail. */
727     private boolean returnNulls = false;
728     
729     
730     protected TestGraph(Map<String, Object> properties, CambriaPublisher producer) {
731       super(properties);
732       
733       setProducer(producer);
734     }
735
736     public void returnNulls() {
737       returnNulls = true;
738     }
739     
740     @Override 
741     public void shutdown() {
742       if(returnNulls) {
743         publisherPool = null;
744       }
745       super.shutdown();
746     }
747     
748     @Override
749     public ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException,
750                                                                      ChampSchemaViolationException, 
751                                                                      ChampObjectNotExistsException {
752       if(!returnNulls) {
753         return object;
754       } else {
755         return null;
756       }
757     }
758
759     @Override
760     public ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException,
761                                                                        ChampSchemaViolationException, 
762                                                                        ChampObjectNotExistsException {
763       if(!returnNulls) {
764         return object;
765       } else {
766         return null;
767       }
768     }
769
770     @Override
771     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
772       
773       if(!returnNulls) {
774         return(Optional.of(ChampObject.create()
775                             .ofType("foo")
776                             .withKey(key)
777                             .build()));  
778       } else {
779         return Optional.empty();
780       }
781     }
782
783     @Override
784     public void executeDeleteObject(Object key) throws ChampObjectNotExistsException {
785    
786     }
787
788     @Override
789     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
790       // Not used by any tests.
791       return null;
792     }
793
794     @Override
795     public ChampRelationship executeStoreRelationship(ChampRelationship relationship) 
796         throws ChampUnmarshallingException, 
797                ChampMarshallingException, 
798                ChampObjectNotExistsException, 
799                ChampSchemaViolationException,
800                ChampRelationshipNotExistsException {
801
802       if(!returnNulls) {
803         return relationship;
804       } else {
805         return null;
806       }
807     }
808
809     @Override
810     public ChampRelationship executeReplaceRelationship(ChampRelationship relationship)
811         throws ChampUnmarshallingException, 
812                ChampMarshallingException,
813                ChampSchemaViolationException, 
814                ChampRelationshipNotExistsException {
815
816       if(!returnNulls) {
817         return relationship;
818       } else {
819         return null;
820       }
821     }
822
823     @Override
824     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
825       // Not used by any tests.
826       return null;
827     }
828
829     @Override
830     public void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
831       // Not used by any tests.   
832     }
833
834     @Override
835     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
836         throws ChampUnmarshallingException, ChampObjectNotExistsException {
837       
838       // Not used by any tests.
839       return null;
840     }
841
842     @Override
843     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
844       
845       // Not used by any tests.
846       return null;
847     }
848
849     @Override
850     public ChampPartition executeStorePartition(ChampPartition partition) 
851         throws ChampSchemaViolationException, 
852                ChampRelationshipNotExistsException,
853                ChampMarshallingException, 
854                ChampObjectNotExistsException {
855
856       if(!returnNulls) {
857         return partition;
858       } else {
859         return null;
860       }
861     }
862
863     @Override
864     public void executeDeletePartition(ChampPartition graph) {
865       // Not used by any tests.     
866     }
867
868     @Override
869     public void executeStoreObjectIndex(ChampObjectIndex index) {
870       // Not used by any tests.    
871     }
872
873     @Override
874     public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
875       
876       if(!returnNulls) {
877         return Optional.of(ChampObjectIndex.create()
878                             .ofName(indexName)
879                             .onType("doesnt matter")
880                             .forField("doesnt matter")
881                             .build());
882       } else {
883         return Optional.empty();
884       }
885     }
886
887     @Override
888     public Stream<ChampObjectIndex> retrieveObjectIndices() {
889       // Not used by any tests.
890       return null;
891     }
892
893     @Override
894     public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
895       // Not used by any tests.    
896     }
897
898     @Override
899     public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
900       // Not used by any tests.  
901     }
902
903     @Override
904     public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
905       if(!returnNulls) {
906         return Optional.of(ChampRelationshipIndex.create()
907                             .ofName(indexName)
908                             .onType("doesnt matter")
909                             .forField("doesnt matter")
910                             .build());
911       } else {
912         return Optional.empty();
913       }
914     }
915
916     @Override
917     public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
918       // Not used by any tests.
919       return null;
920     }
921
922     @Override
923     public void executeDeleteRelationshipIndex(String indexName)
924         throws ChampIndexNotExistsException {
925       // Not used by any tests.    
926     }
927
928     @Override
929     public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
930       // Not used by any tests.    
931     }
932
933     @Override
934     public ChampSchema retrieveSchema() {
935       // Not used by any tests.
936       return null;
937     }
938
939     @Override
940     public void updateSchema(ChampObjectConstraint objectConstraint)
941         throws ChampSchemaViolationException {
942       // Not used by any tests.    
943     }
944
945     @Override
946     public void updateSchema(ChampRelationshipConstraint schema)
947         throws ChampSchemaViolationException {
948       // Not used by any tests.     
949     }
950
951     @Override
952     public void deleteSchema() {
953       // Not used by any tests.  
954     }
955
956     @Override
957     public ChampCapabilities capabilities() {
958       // Not used by any tests.
959       return null;
960     }
961   }
962   
963   private class InMemoryPublisher implements CambriaPublisher {
964
965     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
966     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
967     
968     private boolean failMode=false;
969     
970     public void enterFailMode() {
971       failMode=true;
972     }
973     
974     @Override
975     public void logTo(Logger log) {
976       // Not used by any tests. 
977     }
978
979     @Override
980     public void setApiCredentials(String apiKey, String apiSecret) {
981       // Not used by any tests.  
982     }
983
984     @Override
985     public void clearApiCredentials() {
986       // Not used by any tests.  
987     }
988
989     @Override
990     public void setHttpBasicCredentials(String username, String password) {
991       // Not used by any tests.  
992     }
993
994     @Override
995     public void clearHttpBasicCredentials() {
996       // Not used by any tests.  
997     }
998
999     @Override
1000     public int send(String partition, String msg) throws IOException {
1001       
1002       if(!failMode) {
1003         eventStream.add(msg);
1004         return 0;
1005       } else {
1006         failedMsgs.add(msg);
1007         throw new IOException("nope");
1008       }
1009     }
1010
1011     @Override
1012     public int send(message msg) throws IOException {
1013       eventStream.add(msg.toString());
1014       return 0;
1015     }
1016
1017     @Override
1018     public int send(Collection<message> msgs) throws IOException {
1019       for(message msg : msgs) {
1020         eventStream.add(msg.toString());
1021       }
1022       return 0;
1023     }
1024
1025     @Override
1026     public void close() {
1027       // Not used by any tests.
1028     }     
1029   }
1030 }