Merge "Added @Override annotation above signature"
[aai/champ.git] / src / test / java / org / openecomp / aai / champ / event / AbstractLoggingChampGraphTest.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.openecomp.aai.champ.event;
23
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.concurrent.ArrayBlockingQueue;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Stream;
36
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.openecomp.aai.champ.ChampCapabilities;
41 import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
42 import org.openecomp.aai.champ.exceptions.ChampMarshallingException;
43 import org.openecomp.aai.champ.exceptions.ChampObjectNotExistsException;
44 import org.openecomp.aai.champ.exceptions.ChampRelationshipNotExistsException;
45 import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
46 import org.openecomp.aai.champ.exceptions.ChampUnmarshallingException;
47 import org.openecomp.aai.champ.model.ChampObject;
48 import org.openecomp.aai.champ.model.ChampObjectConstraint;
49 import org.openecomp.aai.champ.model.ChampObjectIndex;
50 import org.openecomp.aai.champ.model.ChampPartition;
51 import org.openecomp.aai.champ.model.ChampRelationship;
52 import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
53 import org.openecomp.aai.champ.model.ChampRelationshipIndex;
54 import org.openecomp.aai.champ.model.ChampSchema;
55 import org.slf4j.Logger;
56
57 import com.att.nsa.cambria.client.CambriaPublisher;
58 import com.fasterxml.jackson.core.JsonParseException;
59 import com.fasterxml.jackson.databind.JsonMappingException;
60
61
62 public class AbstractLoggingChampGraphTest {
63
64   /** Event stream producer stub. */
65   private InMemoryPublisher producer;
66   
67   /** In memory graph for testing purposes. */
68   private TestGraph testGraph;
69   
70   
71   /**
72    * Perform any setup tasks that need to be done prior to each test.
73    */
74   @Before
75   public void setup() {
76     
77     // Instantiate an event stream producer stub to use in our tests.
78     producer = new InMemoryPublisher();
79     
80     // Instantiate an 'in-memory' graph for test purposes.
81     Map<String, Object> graphProperties = new HashMap<String, Object>();
82     graphProperties.put("champ.event.stream.hosts", "myeventstreamhost");
83     graphProperties.put("champ.event.stream.batch-size", 1);
84     testGraph = new TestGraph(graphProperties, producer);
85   }
86   
87   
88   /**
89    * Perform any cleanup that needs to be done after each test.
90    */
91   @After
92   public void tearDown() {
93     
94     // Close our stubbed producer and graph.
95     producer.close();
96     testGraph.shutdown();
97   }
98   
99  
100   /**
101    * Validates that store/replace/delete operation against vertices result in the expected events
102    * being published to the event stream.
103    * 
104    * @throws ChampMarshallingException
105    * @throws ChampSchemaViolationException
106    * @throws ChampObjectNotExistsException
107    * @throws InterruptedException
108    * @throws JsonParseException
109    * @throws JsonMappingException
110    * @throws IOException
111    */
112   @Test
113   public void vertexOperationsTest() throws ChampMarshallingException, 
114                                             ChampSchemaViolationException, 
115                                             ChampObjectNotExistsException, 
116                                             InterruptedException, 
117                                             JsonParseException, 
118                                             JsonMappingException, 
119                                             IOException {
120             
121     // Create a vertex and store it in the graph data store.
122     ChampObject obj1 = ChampObject.create()
123         .ofType("foo")
124         .withKey("123")
125         .withProperty("p1", "v1")
126         .withProperty("p2", "v2")
127         .build();  
128     testGraph.storeObject(obj1);
129
130     // Retrieve the next event from the event stream and validate that it is what we expect.
131     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
132     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
133     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
134   
135     // Create a new vertex based on the one that we already created.
136     ChampObject obj2 = ChampObject.create()
137         .from(obj1)
138         .withKey("123")
139         .withProperty("p3", "v3")
140         .build();
141     
142     // Now, try doing a replace operation.
143     testGraph.replaceObject(obj2);
144     
145     // Retrieve the next event from the event stream and validate that it is what we expect.
146     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
147     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
148     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
149     
150     // Finally, delete the vertex.
151     testGraph.deleteObject("123");
152     
153     // Retrieve the next event from the event stream and validate that it is what we expect.
154     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
155     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
156     assertTrue("Entity type for store event was not a vertex.", loggedEventStr.contains("vertex"));
157   }
158   
159   
160   /**
161    * This test validates that performing vertex operations in the case where the data to be
162    * forwarded to the event stream is unavailable results in no event being generated, but
163    * does not otherwise create issues.
164    * 
165    * @throws ChampMarshallingException
166    * @throws ChampSchemaViolationException
167    * @throws ChampObjectNotExistsException
168    * @throws InterruptedException
169    * @throws JsonParseException
170    * @throws JsonMappingException
171    * @throws IOException
172    */
173   @Test
174   public void vertexOperationsWithNullsTest() throws ChampMarshallingException, 
175                                                      ChampSchemaViolationException, 
176                                                      ChampObjectNotExistsException, 
177                                                      InterruptedException, 
178                                                      JsonParseException, 
179                                                      JsonMappingException, 
180                                                      IOException {
181             
182     // Setup our test graph to simulate failures to retrieve data from the graph data store.
183     testGraph.returnNulls();
184     
185     // Create a vertex and store it in the graph data store.
186     ChampObject obj1 = ChampObject.create()
187         .ofType("foo")
188         .withKey("123")
189         .withProperty("p1", "v1")
190         .withProperty("p2", "v2")
191         .build();  
192     testGraph.storeObject(obj1);
193
194     // Check our simulated event stream to verify that an event log was produced.
195     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
196     
197     // Validate that we did not get an event from the stream.
198     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
199     
200     // Create a new vertex based on the one that we already created.
201     ChampObject obj2 = ChampObject.create()
202         .from(obj1)
203         .withKey("123")
204         .withProperty("p3", "v3")
205         .build();
206     
207     // Now, try doing a replace operation.
208     testGraph.replaceObject(obj2);
209     
210     // Check our simulated event stream to see if an event log was not produced.
211     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
212     
213     // Validate that we did not get an event from the stream.
214     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
215     
216     // Finally, delete the vertex.
217     testGraph.deleteObject("123");
218     
219     // Check our simulated event stream to see if an event log was not produced.
220     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
221     
222     // Validate that we did not get an event from the stream.
223     assertNull("Store vertex event should not have been logged to the event stream", loggedEventStr);
224   }
225   
226   
227   /**
228    * Validates that store/replace/delete operation against edges result in the expected events
229    * being published to the event stream.
230    *
231    * @throws ChampMarshallingException
232    * @throws ChampSchemaViolationException
233    * @throws ChampObjectNotExistsException
234    * @throws InterruptedException
235    * @throws JsonParseException
236    * @throws JsonMappingException
237    * @throws IOException
238    * @throws ChampUnmarshallingException
239    * @throws ChampRelationshipNotExistsException
240    */
241   @Test
242   public void edgeOperationsTest() throws ChampMarshallingException, 
243                                           ChampSchemaViolationException, 
244                                           ChampObjectNotExistsException, 
245                                           InterruptedException, 
246                                           JsonParseException, 
247                                           JsonMappingException, 
248                                           IOException, 
249                                           ChampUnmarshallingException, 
250                                           ChampRelationshipNotExistsException {
251     
252     // Create two vertices to act as the end points of our edge.
253     ChampObject obj1 = ChampObject.create()
254         .ofType("foo")
255         .withKey("123")
256         .withProperty("p1", "v1")
257         .withProperty("p2", "v2")
258         .build();  
259
260     ChampObject obj2 = ChampObject.create()
261         .ofType("bar")
262         .withKey("123")
263         .withProperty("p3", "v3")
264         .build();
265     
266     // Now, create an edge object and write it to the graph data store.
267     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
268         .property("property-1", "value-1")
269         .property("property-2", "value-2")
270         .build();
271     testGraph.storeRelationship(rel);
272     
273     // Retrieve the next event from the event stream and validate that it is what we expect.
274     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
275     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
276     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
277     
278     // Now, create another edge object based on the one we just wrote, and use it to perform
279     // a replace operation.
280     ChampRelationship rel2 = ChampRelationship.create()
281         .from(rel)
282         .withKey("123")
283         .withProperty("property-3", "value-3")
284         .build();
285     testGraph.replaceRelationship(rel2);
286
287     // Retrieve the next event from the event stream and validate that it is what we expect.
288     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
289     assertTrue("Expected REPLACE event.", loggedEventStr.contains("REPLACE"));
290     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
291     
292     // Finally, delete our edge.
293     testGraph.deleteRelationship(rel2);
294     
295     // Retrieve the next event from the event stream and validate that it is what we expect.
296     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
297     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
298     assertTrue("Entity type for store event was not an edge.", loggedEventStr.contains("relationship"));
299   }
300   
301   
302   /**
303    * This test validates that performing edge operations in the case where the data to be
304    * forwarded to the event stream is unavailable results in no event being generated, but
305    * does not otherwise create issues.
306    * 
307    * @throws ChampMarshallingException
308    * @throws ChampSchemaViolationException
309    * @throws ChampObjectNotExistsException
310    * @throws InterruptedException
311    * @throws JsonParseException
312    * @throws JsonMappingException
313    * @throws IOException
314    * @throws ChampUnmarshallingException
315    * @throws ChampRelationshipNotExistsException
316    */
317   @Test
318   public void edgeOperationsWithNullsTest() throws ChampMarshallingException, 
319                                                    ChampSchemaViolationException, 
320                                                    ChampObjectNotExistsException, 
321                                                    InterruptedException, 
322                                                    JsonParseException, 
323                                                    JsonMappingException, 
324                                                    IOException, 
325                                                    ChampUnmarshallingException, 
326                                                    ChampRelationshipNotExistsException {
327     
328     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
329     // events.
330     testGraph.returnNulls();
331     
332     // Create two vertices to act as the endpoints of our edge.
333     ChampObject obj1 = ChampObject.create()
334         .ofType("foo")
335         .withKey("123")
336         .withProperty("p1", "v1")
337         .withProperty("p2", "v2")
338         .build();  
339
340     ChampObject obj2 = ChampObject.create()
341         .ofType("bar")
342         .withKey("123")
343         .withProperty("p3", "v3")
344         .build();
345     
346     // Now, create an edge object and write it to the graph data store.
347     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
348         .property("property-1", "value-1")
349         .property("property-2", "value-2")
350         .build();
351     testGraph.storeRelationship(rel);
352     
353     // Check our simulated event stream to see if an event log was produced.
354     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
355     
356     // Validate that we did not get an event from the stream.
357     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);
358         
359     // Now, create another edge object based on the one we just wrote, and use it to perform
360     // a replace operation.
361     ChampRelationship rel2 = ChampRelationship.create()
362         .from(rel)
363         .withKey("123")
364         .withProperty("property-3", "value-3")
365         .build();
366     testGraph.replaceRelationship(rel2);
367     
368     // Check our simulated event stream to see if an event log was produced.
369     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
370     
371     // Validate that we did not get an event from the stream.
372     assertNull("Store edge event should not have been logged to the event stream", loggedEventStr);   
373   }
374   
375   
376   /**
377    * Validates that store/replace/delete operation against partitions result in the expected events
378    * being published to the event stream.
379    *
380    * @throws ChampMarshallingException
381    * @throws ChampSchemaViolationException
382    * @throws ChampObjectNotExistsException
383    * @throws InterruptedException
384    * @throws JsonParseException
385    * @throws JsonMappingException
386    * @throws IOException
387    * @throws ChampUnmarshallingException
388    * @throws ChampRelationshipNotExistsException
389    */
390   @Test
391   public void partitionOperationsTest() throws ChampMarshallingException, 
392                                                ChampSchemaViolationException, 
393                                                ChampObjectNotExistsException, 
394                                                InterruptedException, 
395                                                JsonParseException, 
396                                                JsonMappingException, 
397                                                IOException, 
398                                                ChampUnmarshallingException, 
399                                                ChampRelationshipNotExistsException {
400     
401     // Create the vertices and edge objects that we need to create a partition.
402     ChampObject obj1 = ChampObject.create()
403         .ofType("foo")
404         .withKey("123")
405         .withProperty("p1", "v1")
406         .withProperty("p2", "v2")
407         .build();  
408
409     ChampObject obj2 = ChampObject.create()
410         .ofType("bar")
411         .withKey("123")
412         .withProperty("p3", "v3")
413         .build();
414     
415     // Now, create an edge object and write it to the graph data store.
416     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
417         .property("property-1", "value-1")
418         .property("property-2", "value-2")
419         .build();
420     
421     // Now, create our partition object and store it in the graph.
422     ChampPartition partition = ChampPartition.create()
423         .withObject(obj1)
424         .withObject(obj2)
425         .withRelationship(rel)
426         .build();
427     testGraph.storePartition(partition);
428     
429     // Retrieve the next event from the event stream and validate that it is what we expect.
430     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
431     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
432     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
433
434     // Now, delete our partition.
435     testGraph.deletePartition(partition);
436     
437     // Retrieve the next event from the event stream and validate that it is what we expect.
438     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
439     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
440     assertTrue("Entity type for store event was not a partition.", loggedEventStr.contains("partition"));
441   }
442   
443   
444   /**
445    * This test validates that performing partition operations in the case where the data to be
446    * forwarded to the event stream is unavailable results in no event being generated, but
447    * does not otherwise create issues.
448    * 
449    * @throws ChampMarshallingException
450    * @throws ChampSchemaViolationException
451    * @throws ChampObjectNotExistsException
452    * @throws InterruptedException
453    * @throws JsonParseException
454    * @throws JsonMappingException
455    * @throws IOException
456    * @throws ChampUnmarshallingException
457    * @throws ChampRelationshipNotExistsException
458    */
459   @Test
460   public void partitionOperationsWithNullsTest() throws ChampMarshallingException, 
461                                           ChampSchemaViolationException, 
462                                           ChampObjectNotExistsException, 
463                                           InterruptedException, 
464                                           JsonParseException, 
465                                           JsonMappingException, 
466                                           IOException, 
467                                           ChampUnmarshallingException, 
468                                           ChampRelationshipNotExistsException {
469     
470     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
471     // events.
472     testGraph.returnNulls();
473     
474     // Create all of the objects we need to create a partition, and store the partition
475     // in the graph.
476     ChampObject obj1 = ChampObject.create()
477         .ofType("foo")
478         .withKey("123")
479         .withProperty("p1", "v1")
480         .withProperty("p2", "v2")
481         .build();  
482
483     ChampObject obj2 = ChampObject.create()
484         .ofType("bar")
485         .withKey("123")
486         .withProperty("p3", "v3")
487         .build();
488     
489     ChampRelationship rel = new ChampRelationship.Builder(obj1, obj2, "relationship")
490         .property("property-1", "value-1")
491         .property("property-2", "value-2")
492         .build();
493     
494     ChampPartition partition = ChampPartition.create()
495         .withObject(obj1)
496         .withObject(obj2)
497         .withRelationship(rel)
498         .build();
499     testGraph.storePartition(partition);
500     
501     // Check our simulated event stream to see if an an event log was produced.
502     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
503     
504     // Validate that we did not get an event from the stream.
505     assertNull("Store partition event should not have been logged to the event stream", loggedEventStr);
506   }
507   
508   
509   /**
510    * Validates that store/replace/delete operation against vertex indexes result in the expected
511    * events being published to the event stream.
512    * 
513    * @throws ChampMarshallingException
514    * @throws ChampSchemaViolationException
515    * @throws ChampObjectNotExistsException
516    * @throws InterruptedException
517    * @throws JsonParseException
518    * @throws JsonMappingException
519    * @throws IOException
520    * @throws ChampUnmarshallingException
521    * @throws ChampRelationshipNotExistsException
522    * @throws ChampIndexNotExistsException
523    */
524   @Test
525   public void indexOperationsTest() throws ChampMarshallingException, 
526                                            ChampSchemaViolationException, 
527                                            ChampObjectNotExistsException, 
528                                            InterruptedException, 
529                                            JsonParseException, 
530                                            JsonMappingException, 
531                                            IOException, 
532                                            ChampUnmarshallingException, 
533                                            ChampRelationshipNotExistsException, 
534                                            ChampIndexNotExistsException {
535         
536     // Create an index object and store it in the graph.
537     ChampObjectIndex objIndex = ChampObjectIndex.create()
538         .ofName("myIndex")
539         .onType("type")
540         .forField("myField")
541         .build();
542     testGraph.storeObjectIndex(objIndex);
543     
544     // Retrieve the next event from the event stream and validate that it is what we expect.
545     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
546     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
547     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
548     
549     // Now, delete our partition.
550     testGraph.deleteObjectIndex("myIndex");
551     
552     // Retrieve the next event from the event stream and validate that it is what we expect.
553     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
554     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
555     assertTrue("Entity type for store event was not a vertex index.", loggedEventStr.contains("objectIndex"));
556   }
557   
558   /**
559    * This test validates that performing index operations in the case where the data to be
560    * forwarded to the event stream is unavailable results in no event being generated, but
561    * does not otherwise create issues.
562    * 
563    * @throws ChampMarshallingException
564    * @throws ChampSchemaViolationException
565    * @throws ChampObjectNotExistsException
566    * @throws InterruptedException
567    * @throws JsonParseException
568    * @throws JsonMappingException
569    * @throws IOException
570    * @throws ChampUnmarshallingException
571    * @throws ChampRelationshipNotExistsException
572    * @throws ChampIndexNotExistsException
573    */
574   @Test
575   public void indexOperationsWithNullsTest() throws ChampMarshallingException, 
576                                                     ChampSchemaViolationException, 
577                                                     ChampObjectNotExistsException, 
578                                                     InterruptedException, 
579                                                     JsonParseException, 
580                                                     JsonMappingException, 
581                                                     IOException, 
582                                                     ChampUnmarshallingException, 
583                                                     ChampRelationshipNotExistsException, 
584                                                     ChampIndexNotExistsException {
585     
586     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
587     // events.
588     testGraph.returnNulls();
589     
590     // Create an index object and store it in the graph.
591     ChampObjectIndex objIndex = ChampObjectIndex.create()
592         .ofName("myIndex")
593         .onType("type")
594         .forField("myField")
595         .build();
596     testGraph.storeObjectIndex(objIndex);
597     
598     // Check our simulated event stream to see if an  an event log was produced.
599     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
600     
601     // Now, delete our index.
602     testGraph.deleteObjectIndex("myIndex");
603     
604     // Check our simulated event stream to see if an an event log was produced.
605     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
606     
607     // Validate that we did not get an event from the stream.
608     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
609   }
610   
611   
612   /**
613    * This test validates that performing relationship index operations in the case where 
614    * the data to be forwarded to the event stream is unavailable results in no event being 
615    * generated, but does not otherwise create issues.
616    * 
617    * @throws ChampMarshallingException
618    * @throws ChampSchemaViolationException
619    * @throws ChampObjectNotExistsException
620    * @throws InterruptedException
621    * @throws JsonParseException
622    * @throws JsonMappingException
623    * @throws IOException
624    * @throws ChampUnmarshallingException
625    * @throws ChampRelationshipNotExistsException
626    * @throws ChampIndexNotExistsException
627    */
628   @Test
629   public void relationshipIndexOperationsTest() throws ChampMarshallingException, 
630                                                        ChampSchemaViolationException, 
631                                                        ChampObjectNotExistsException, 
632                                                        InterruptedException, 
633                                                        JsonParseException, 
634                                                        JsonMappingException, 
635                                                        IOException, 
636                                                        ChampUnmarshallingException, 
637                                                        ChampRelationshipNotExistsException, 
638                                                        ChampIndexNotExistsException {
639         
640     // Create a relationship index object and store it in the graph.
641     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
642         .ofName("myIndex")
643         .onType("type")
644         .forField("myField")
645         .build();
646     testGraph.storeRelationshipIndex(relIndex);
647     
648     // Retrieve the next event from the event stream and validate that it is what we expect.
649     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
650     assertTrue("Expected STORE event.", loggedEventStr.contains("STORE"));
651     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
652     
653     // Now, delete our partition.
654     testGraph.deleteRelationshipIndex("myIndex");
655     
656     // Retrieve the next event from the event stream and validate that it is what we expect.
657     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
658     assertTrue("Expected DELETE event.", loggedEventStr.contains("DELETE"));
659     assertTrue("Entity type for store event was not a relationship index.", loggedEventStr.contains("relationshipIndex"));
660   }
661   
662   
663   /**
664    * This test validates that performing index operations in the case where the data to be
665    * forwarded to the event stream is unavailable results in no event being generated, but
666    * does not otherwise create issues.
667    * 
668    * @throws ChampMarshallingException
669    * @throws ChampSchemaViolationException
670    * @throws ChampObjectNotExistsException
671    * @throws InterruptedException
672    * @throws JsonParseException
673    * @throws JsonMappingException
674    * @throws IOException
675    * @throws ChampUnmarshallingException
676    * @throws ChampRelationshipNotExistsException
677    * @throws ChampIndexNotExistsException
678    */
679   @Test
680   public void relationshipIndexOperationsWithNullsTest() throws ChampMarshallingException, 
681                                                                 ChampSchemaViolationException, 
682                                                                 ChampObjectNotExistsException, 
683                                                                 InterruptedException, 
684                                                                 JsonParseException, 
685                                                                 JsonMappingException, 
686                                                                 IOException, 
687                                                                 ChampUnmarshallingException, 
688                                                                 ChampRelationshipNotExistsException, 
689                                                                 ChampIndexNotExistsException {
690     
691     // Set up our graph to simulate a failure to retrieve some of the data we need to generate
692     // events.
693     testGraph.returnNulls();
694     
695     // Create a relationship index object and store it in the graph.
696     ChampRelationshipIndex relIndex = ChampRelationshipIndex.create()
697         .ofName("myIndex")
698         .onType("type")
699         .forField("myField")
700         .build();
701     
702     testGraph.storeRelationshipIndex(relIndex);
703     
704     // Check our simulated event stream to see if an an event log was produced.
705     String loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
706     
707     // Now, delete our index.
708     testGraph.deleteRelationshipIndex("myIndex");
709     
710     // Check our simulated event stream to see if an event log was produced.
711     loggedEventStr = producer.eventStream.poll(5000, TimeUnit.MILLISECONDS);
712     
713     // Validate that we did not get an event from the stream.
714     assertNull("Delete partition event should not have been logged to the event stream", loggedEventStr);
715   }
716       
717   
718   /**
719    * This is a simple graph stub that extends our {@link AbstractLoggingChampGraph} class which 
720    * we can use to validate that log events get generated without worrying about having a real
721    * underlying graph.
722    */
723   private class TestGraph extends AbstractLoggingChampGraph {
724     
725     /** If set, this causes simulated retrieve operations to fail. */
726     private boolean returnNulls = false;
727     
728     
729     protected TestGraph(Map<String, Object> properties, CambriaPublisher producer) {
730       super(properties);
731       
732       setProducer(producer);
733     }
734
735     public void returnNulls() {
736       returnNulls = true;
737     }
738     
739     @Override 
740     public void shutdown() {
741       if(returnNulls) {
742         publisherPool = null;
743       }
744       super.shutdown();
745     }
746     
747     @Override
748     public ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException,
749                                                                      ChampSchemaViolationException, 
750                                                                      ChampObjectNotExistsException {
751       if(!returnNulls) {
752         return object;
753       } else {
754         return null;
755       }
756     }
757
758     @Override
759     public ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException,
760                                                                        ChampSchemaViolationException, 
761                                                                        ChampObjectNotExistsException {
762       if(!returnNulls) {
763         return object;
764       } else {
765         return null;
766       }
767     }
768
769     @Override
770     public Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException {
771       
772       if(!returnNulls) {
773         return(Optional.of(ChampObject.create()
774                             .ofType("foo")
775                             .withKey(key)
776                             .build()));  
777       } else {
778         return Optional.empty();
779       }
780     }
781
782     @Override
783     public void executeDeleteObject(Object key) throws ChampObjectNotExistsException {
784    
785     }
786
787     @Override
788     public Stream<ChampObject> queryObjects(Map<String, Object> queryParams) {
789       // Not used by any tests.
790       return null;
791     }
792
793     @Override
794     public ChampRelationship executeStoreRelationship(ChampRelationship relationship) 
795         throws ChampUnmarshallingException, 
796                ChampMarshallingException, 
797                ChampObjectNotExistsException, 
798                ChampSchemaViolationException,
799                ChampRelationshipNotExistsException {
800
801       if(!returnNulls) {
802         return relationship;
803       } else {
804         return null;
805       }
806     }
807
808     @Override
809     public ChampRelationship executeReplaceRelationship(ChampRelationship relationship)
810         throws ChampUnmarshallingException, 
811                ChampMarshallingException,
812                ChampSchemaViolationException, 
813                ChampRelationshipNotExistsException {
814
815       if(!returnNulls) {
816         return relationship;
817       } else {
818         return null;
819       }
820     }
821
822     @Override
823     public Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException {
824       // Not used by any tests.
825       return null;
826     }
827
828     @Override
829     public void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
830       // Not used by any tests.   
831     }
832
833     @Override
834     public Stream<ChampRelationship> retrieveRelationships(ChampObject object)
835         throws ChampUnmarshallingException, ChampObjectNotExistsException {
836       
837       // Not used by any tests.
838       return null;
839     }
840
841     @Override
842     public Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams) {
843       
844       // Not used by any tests.
845       return null;
846     }
847
848     @Override
849     public ChampPartition executeStorePartition(ChampPartition partition) 
850         throws ChampSchemaViolationException, 
851                ChampRelationshipNotExistsException,
852                ChampMarshallingException, 
853                ChampObjectNotExistsException {
854
855       if(!returnNulls) {
856         return partition;
857       } else {
858         return null;
859       }
860     }
861
862     @Override
863     public void executeDeletePartition(ChampPartition graph) {
864       // Not used by any tests.     
865     }
866
867     @Override
868     public void executeStoreObjectIndex(ChampObjectIndex index) {
869       // Not used by any tests.    
870     }
871
872     @Override
873     public Optional<ChampObjectIndex> retrieveObjectIndex(String indexName) {
874       
875       if(!returnNulls) {
876         return Optional.of(ChampObjectIndex.create()
877                             .ofName(indexName)
878                             .onType("doesnt matter")
879                             .forField("doesnt matter")
880                             .build());
881       } else {
882         return Optional.empty();
883       }
884     }
885
886     @Override
887     public Stream<ChampObjectIndex> retrieveObjectIndices() {
888       // Not used by any tests.
889       return null;
890     }
891
892     @Override
893     public void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
894       // Not used by any tests.    
895     }
896
897     @Override
898     public void executeStoreRelationshipIndex(ChampRelationshipIndex index) {
899       // Not used by any tests.  
900     }
901
902     @Override
903     public Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName) {
904       if(!returnNulls) {
905         return Optional.of(ChampRelationshipIndex.create()
906                             .ofName(indexName)
907                             .onType("doesnt matter")
908                             .forField("doesnt matter")
909                             .build());
910       } else {
911         return Optional.empty();
912       }
913     }
914
915     @Override
916     public Stream<ChampRelationshipIndex> retrieveRelationshipIndices() {
917       // Not used by any tests.
918       return null;
919     }
920
921     @Override
922     public void executeDeleteRelationshipIndex(String indexName)
923         throws ChampIndexNotExistsException {
924       // Not used by any tests.    
925     }
926
927     @Override
928     public void storeSchema(ChampSchema schema) throws ChampSchemaViolationException {
929       // Not used by any tests.    
930     }
931
932     @Override
933     public ChampSchema retrieveSchema() {
934       // Not used by any tests.
935       return null;
936     }
937
938     @Override
939     public void updateSchema(ChampObjectConstraint objectConstraint)
940         throws ChampSchemaViolationException {
941       // Not used by any tests.    
942     }
943
944     @Override
945     public void updateSchema(ChampRelationshipConstraint schema)
946         throws ChampSchemaViolationException {
947       // Not used by any tests.     
948     }
949
950     @Override
951     public void deleteSchema() {
952       // Not used by any tests.  
953     }
954
955     @Override
956     public ChampCapabilities capabilities() {
957       // Not used by any tests.
958       return null;
959     }
960   }
961   
962   private class InMemoryPublisher implements CambriaPublisher {
963
964     public BlockingQueue<String> eventStream = new ArrayBlockingQueue<String>(50);
965     public BlockingQueue<String> failedMsgs = new ArrayBlockingQueue<String>(10);
966     
967     private boolean failMode=false;
968     
969     public void enterFailMode() {
970       failMode=true;
971     }
972     
973     @Override
974     public void logTo(Logger log) {
975       // Not used by any tests. 
976     }
977
978     @Override
979     public void setApiCredentials(String apiKey, String apiSecret) {
980       // Not used by any tests.  
981     }
982
983     @Override
984     public void clearApiCredentials() {
985       // Not used by any tests.  
986     }
987
988     @Override
989     public void setHttpBasicCredentials(String username, String password) {
990       // Not used by any tests.  
991     }
992
993     @Override
994     public void clearHttpBasicCredentials() {
995       // Not used by any tests.  
996     }
997
998     @Override
999     public int send(String partition, String msg) throws IOException {
1000       
1001       if(!failMode) {
1002         eventStream.add(msg);
1003         return 0;
1004       } else {
1005         failedMsgs.add(msg);
1006         throw new IOException("nope");
1007       }
1008     }
1009
1010     @Override
1011     public int send(message msg) throws IOException {
1012       eventStream.add(msg.toString());
1013       return 0;
1014     }
1015
1016     @Override
1017     public int send(Collection<message> msgs) throws IOException {
1018       for(message msg : msgs) {
1019         eventStream.add(msg.toString());
1020       }
1021       return 0;
1022     }
1023
1024     @Override
1025     public void close() {
1026       // Not used by any tests.
1027     }     
1028   }
1029 }