Merge "Add the @Override annotation"
[aai/champ.git] / src / main / java / org / onap / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.event;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
34
35 import org.onap.aai.champ.ChampCapabilities;
36 import org.onap.aai.champ.ChampGraph;
37 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
38 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champ.exceptions.ChampMarshallingException;
40 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.onap.aai.champ.model.ChampObject;
45 import org.onap.aai.champ.model.ChampObjectConstraint;
46 import org.onap.aai.champ.model.ChampObjectIndex;
47 import org.onap.aai.champ.model.ChampPartition;
48 import org.onap.aai.champ.model.ChampRelationship;
49 import org.onap.aai.champ.model.ChampRelationshipConstraint;
50 import org.onap.aai.champ.model.ChampRelationshipIndex;
51 import org.onap.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
57
58
59 /**
60  * This class provides the hooks to allow Champ operations to be logged to an event
61  * stream.
62  */
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
64
65   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
66   
67   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
70   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
72   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
73   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
74   @Override
75   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
76   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
77   @Override
78   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
79   @Override
80   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
81   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
82   public abstract void                             executeDeletePartition(ChampPartition graph);
83   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
84   @Override
85   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
86   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
87   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
88   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
89   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
90   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
91   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
92   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
93   public abstract ChampSchema                      retrieveSchema();
94   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
95   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
96   public abstract void                             deleteSchema();
97   public abstract ChampCapabilities                capabilities();
98
99    
100   /** Configuration property for setting the comma-separated list of servers to use for
101    *  communicating with the event bus. */
102   public static final String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
103   
104   /** Configuration property for setting the number of events that we will try to 'batch' 
105    *  up before sending them to the event bus. */
106   public final static String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
107   public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
108   
109   /** Configuration property for setting the maximum amount of time to wait for a batch of
110    *  outgoing messages to fill up before sending the batch. */
111   public final static String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
112   public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
113   
114   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
115   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
116   
117   /** The event stream topic that we will publish Champ events to. */
118   public final static String EVENT_TOPIC = "champRawEvents";
119     
120   /** Number of events to 'batch up' before actually publishing them to the event bus. */
121   private Integer eventStreamBatchSize;
122   
123   private Integer eventStreamBatchTimeout;
124   
125   private Integer eventStreamPublisherPoolSize;
126   
127   /** Comma-separated list of hosts for connecting to the event bus. */
128   private String  eventStreamHosts = null;
129   
130   /** Client used for publishing messages to the event bus. */
131   protected CambriaPublisher producer;
132
133   /** Pool of worker threads that do the work of publishing the events to the event bus. */
134   protected ThreadPoolExecutor publisherPool;
135   
136   
137   /**
138    * Create a new instance of the AbstractLoggingChampGraph.
139    * 
140    * @param properties - Set of configuration properties for this graph instance.
141    */
142   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
143     
144     // Extract the necessary parameters from the configuration properties.
145     configure(properties);
146       
147     // Create the executor pool that will do the work of publishing events to the event bus.
148     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
149     
150     // Make sure that we are actually provided a list of hosts for connecting to the event
151     // bus before we actually try to do anything useful.
152     if(eventStreamHosts == null) {
153       
154       // We were not supplied a list of event bus hosts, so just bail.
155       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
156       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
157       return;
158     }
159          
160     try {
161         
162       // Instantiate the producer that we will use to publish events to the event stream.
163       setProducer(new CambriaClientBuilders.PublisherBuilder()
164                         .usingHosts(eventStreamHosts)
165                         .onTopic(EVENT_TOPIC)
166                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
167                         .build());
168       
169     } catch (MalformedURLException | GeneralSecurityException e) {
170       
171       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
172       logger.error("NOTE: Champ events will NOT be published to the event stream");
173       producer = null;
174     }
175   }
176
177       
178   /**
179    * Process the configuration properties supplied for this graph instance.
180    * 
181    * @param properties - Configuration parameters.
182    */
183   private void configure(Map<String, Object> properties) {
184     
185     eventStreamBatchSize = 
186         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
187     eventStreamBatchTimeout = 
188         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
189     eventStreamPublisherPoolSize = 
190         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
191     
192     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
193       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
194     } 
195   }
196   
197   public void setProducer(CambriaPublisher aProducer) {
198     
199     producer = aProducer;
200   }
201   
202   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
203     
204     if(properties.containsKey(property)) {
205       return properties.get(property);
206     } else {
207       return defaultValue;
208     }
209   }
210   
211   @Override
212   public void shutdown() {
213     
214     if(publisherPool != null) {
215       publisherPool.shutdown();
216       
217       try {
218         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
219       } catch (InterruptedException e) {}
220     }
221     
222     if(producer != null) {
223       producer.close();
224     }
225   }
226   
227   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
228     
229     ChampObject storedObject = executeStoreObject(object);
230     
231     if(storedObject != null) {
232       
233       // Update the event stream with the current operation.
234       logEvent(ChampEvent.builder()
235                     .operation(ChampOperation.STORE)
236                     .entity(storedObject)
237                     .build());
238     }
239     
240     return storedObject;
241   }
242   
243   
244   public ChampObject replaceObject(ChampObject object)
245       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
246     
247     ChampObject replacedObject = executeReplaceObject(object);
248     
249     if(replacedObject != null) {
250       
251       // Update the event stream with the current operation.
252       logEvent(ChampEvent.builder()
253                     .operation(ChampOperation.REPLACE)
254                     .entity(replacedObject)
255                     .build());
256     }
257     
258     return replacedObject;
259   }
260   
261
262   public void deleteObject(Object key) throws ChampObjectNotExistsException {
263
264     // Retrieve the object that we are deleting before it's gone, so that we can 
265     // report it to the event stream.
266     Optional<ChampObject> objectToDelete = Optional.empty();
267     try {
268       objectToDelete = retrieveObject(key);
269       
270     } catch (ChampUnmarshallingException e) {
271       logger.error("Unable to generate delete object log: " + e.getMessage());
272     }
273     
274     executeDeleteObject(key);
275     
276     if(objectToDelete.isPresent()) {
277       // Update the event stream with the current operation.
278       logEvent(ChampEvent.builder()
279                     .operation(ChampOperation.DELETE)
280                     .entity(objectToDelete.get())
281                     .build());
282     }
283   }
284   
285   
286   public ChampRelationship storeRelationship(ChampRelationship relationship)
287       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
288
289     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
290     
291     if(storedRelationship != null) {
292       
293       // Update the event stream with the current operation.
294       logEvent(ChampEvent.builder()
295                     .operation(ChampOperation.STORE)
296                     .entity(storedRelationship)
297                     .build());
298     }
299     
300     return storedRelationship;
301   }
302   
303   
304   public ChampRelationship replaceRelationship(ChampRelationship relationship)
305       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
306
307     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
308     
309     if(replacedRelationship != null) {
310       
311       // Update the event stream with the current operation.
312       logEvent(ChampEvent.builder()
313                     .operation(ChampOperation.REPLACE)
314                     .entity(replacedRelationship)
315                     .build());
316     }
317     
318     return replacedRelationship;
319   }
320   
321   
322   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
323
324     executeDeleteRelationship(relationship);
325     
326     // Update the event stream with the current operation.
327     logEvent(ChampEvent.builder()
328                   .operation(ChampOperation.DELETE)
329                   .entity(relationship)
330                   .build());
331   }
332   
333   
334   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
335
336     ChampPartition storedPartition = executeStorePartition(partition);
337     
338     if(storedPartition != null) {
339       
340       // Update the event stream with the current operation.
341       logEvent(ChampEvent.builder()
342                     .operation(ChampOperation.STORE)
343                     .entity(storedPartition)
344                     .build());
345     }
346     
347     return storedPartition;
348   }
349   
350   
351   public void deletePartition(ChampPartition graph) {
352
353     executeDeletePartition(graph);
354     
355     // Update the event stream with the current operation.
356     logEvent(ChampEvent.builder()
357                   .operation(ChampOperation.DELETE)
358                   .entity(graph)
359                   .build());
360   }
361   
362   
363   public void storeObjectIndex(ChampObjectIndex index) {
364
365     executeStoreObjectIndex(index);
366     
367     // Update the event stream with the current operation.
368     logEvent(ChampEvent.builder()
369                   .operation(ChampOperation.STORE)
370                   .entity(index)
371                   .build());
372   }
373   
374   
375   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
376     
377     // Retrieve the index that we are deleting before it's gone, so that we can 
378     // report it to the event stream.
379     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
380     
381     executeDeleteObjectIndex(indexName);
382     
383     if(indexToDelete.isPresent()) {
384       // Update the event stream with the current operation.
385       logEvent(ChampEvent.builder()
386                     .operation(ChampOperation.DELETE)
387                     .entity(indexToDelete.get()) 
388                     .build());
389     }
390   }
391   
392   
393   public void storeRelationshipIndex(ChampRelationshipIndex index) {
394
395     executeStoreRelationshipIndex(index);
396     
397     // Update the event stream with the current operation.
398     logEvent(ChampEvent.builder()
399                   .operation(ChampOperation.STORE)
400                   .entity(index) 
401                   .build());
402   }
403   
404   
405   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
406
407     // Retrieve the index that we are deleting before it's gone, so that we can 
408     // report it to the event stream.
409     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
410     
411     executeDeleteRelationshipIndex(indexName);
412     
413     if(indexToDelete.isPresent()) {
414       // Update the event stream with the current operation.
415       logEvent(ChampEvent.builder()
416                     .operation(ChampOperation.DELETE)
417                     .entity(indexToDelete.get()) 
418                     .build());
419     }
420   }
421   
422   
423   /**
424    * Submits an event to be published to the event stream.
425    * 
426    * @param anEvent - The event to be published.
427    */
428   public void logEvent(ChampEvent anEvent) {
429     
430     if(logger.isDebugEnabled()) {
431       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
432     }
433     
434     try {
435       
436       // Try submitting the event to be published to the event bus.
437       publisherPool.execute(new EventPublisher(anEvent));
438     
439     } catch (RejectedExecutionException re) {
440       logger.error("Event could not be published to the event bus due to: " + re.getMessage());
441       
442     } catch (NullPointerException npe) {
443       logger.error("Can not publish null event to event bus.");
444     }
445   }
446   
447   
448   /**
449    * This class runs as a background thread and is responsible for pulling Champ events off
450    * of the internal queue and publishing them to the event stream.
451    */
452   private class EventPublisher implements Runnable {
453     
454     /** Partition key to use when publishing events to the event stream.  We WANT all events
455      *  to go to a single partition, so we are just using a hard-coded key for every event. */
456     private static final String EVENTS_PARTITION_KEY = "champEventKey";
457     
458     private ChampEvent event;
459     
460     public EventPublisher(ChampEvent event) {
461       this.event = event;
462     }
463     
464     
465     @Override
466     public void run() {
467
468       boolean done = false;
469       while(!done && !Thread.currentThread().isInterrupted()) {
470         try {
471           
472           // Make sure that we actually have a producer instance to use to publish
473           // the event with.
474           if(producer != null) {
475             
476             // Try publishing the event to the event bus.
477             producer.send(EVENTS_PARTITION_KEY, event.toJson());
478             
479           } else if (logger.isDebugEnabled()) {            
480             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
481           }
482           
483           done = true;
484           
485         } catch (IOException e) {
486   
487           // We were unable to publish to the event bus, so wait a bit and then try
488           // again.
489           try {
490             Thread.sleep(500);
491             
492           } catch (InterruptedException e1) {
493             logger.info("Stopping event publisher worker thread.");
494             return;
495           }
496         }           
497       }
498     }
499   }
500 }