Merge "Add the @Override annotation"
[aai/champ.git] / src / main / java / org / onap / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.event;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
34
35 import org.onap.aai.champ.ChampCapabilities;
36 import org.onap.aai.champ.ChampGraph;
37 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
38 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champ.exceptions.ChampMarshallingException;
40 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.onap.aai.champ.model.ChampObject;
45 import org.onap.aai.champ.model.ChampObjectConstraint;
46 import org.onap.aai.champ.model.ChampObjectIndex;
47 import org.onap.aai.champ.model.ChampPartition;
48 import org.onap.aai.champ.model.ChampRelationship;
49 import org.onap.aai.champ.model.ChampRelationshipConstraint;
50 import org.onap.aai.champ.model.ChampRelationshipIndex;
51 import org.onap.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
57
58
59 /**
60  * This class provides the hooks to allow Champ operations to be logged to an event
61  * stream.
62  */
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
64
65   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
66   
67   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
70   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
72   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
73   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
74   @Override
75   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
76   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
77   @Override
78   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
79   @Override
80   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
81   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
82   public abstract void                             executeDeletePartition(ChampPartition graph);
83   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
84   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
85   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
86   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
87   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
88   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
89   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
90   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
91   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
92   public abstract ChampSchema                      retrieveSchema();
93   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
94   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
95   public abstract void                             deleteSchema();
96   public abstract ChampCapabilities                capabilities();
97
98    
99   /** Configuration property for setting the comma-separated list of servers to use for
100    *  communicating with the event bus. */
101   public static final String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
102   
103   /** Configuration property for setting the number of events that we will try to 'batch' 
104    *  up before sending them to the event bus. */
105   public final static String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
106   public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
107   
108   /** Configuration property for setting the maximum amount of time to wait for a batch of
109    *  outgoing messages to fill up before sending the batch. */
110   public final static String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
111   public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
112   
113   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
114   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
115   
116   /** The event stream topic that we will publish Champ events to. */
117   public final static String EVENT_TOPIC = "champRawEvents";
118     
119   /** Number of events to 'batch up' before actually publishing them to the event bus. */
120   private Integer eventStreamBatchSize;
121   
122   private Integer eventStreamBatchTimeout;
123   
124   private Integer eventStreamPublisherPoolSize;
125   
126   /** Comma-separated list of hosts for connecting to the event bus. */
127   private String  eventStreamHosts = null;
128   
129   /** Client used for publishing messages to the event bus. */
130   protected CambriaPublisher producer;
131
132   /** Pool of worker threads that do the work of publishing the events to the event bus. */
133   protected ThreadPoolExecutor publisherPool;
134   
135   
136   /**
137    * Create a new instance of the AbstractLoggingChampGraph.
138    * 
139    * @param properties - Set of configuration properties for this graph instance.
140    */
141   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
142     
143     // Extract the necessary parameters from the configuration properties.
144     configure(properties);
145       
146     // Create the executor pool that will do the work of publishing events to the event bus.
147     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
148     
149     // Make sure that we are actually provided a list of hosts for connecting to the event
150     // bus before we actually try to do anything useful.
151     if(eventStreamHosts == null) {
152       
153       // We were not supplied a list of event bus hosts, so just bail.
154       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
155       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
156       return;
157     }
158          
159     try {
160         
161       // Instantiate the producer that we will use to publish events to the event stream.
162       setProducer(new CambriaClientBuilders.PublisherBuilder()
163                         .usingHosts(eventStreamHosts)
164                         .onTopic(EVENT_TOPIC)
165                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
166                         .build());
167       
168     } catch (MalformedURLException | GeneralSecurityException e) {
169       
170       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
171       logger.error("NOTE: Champ events will NOT be published to the event stream");
172       producer = null;
173     }
174   }
175
176       
177   /**
178    * Process the configuration properties supplied for this graph instance.
179    * 
180    * @param properties - Configuration parameters.
181    */
182   private void configure(Map<String, Object> properties) {
183     
184     eventStreamBatchSize = 
185         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
186     eventStreamBatchTimeout = 
187         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
188     eventStreamPublisherPoolSize = 
189         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
190     
191     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
192       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
193     } 
194   }
195   
196   public void setProducer(CambriaPublisher aProducer) {
197     
198     producer = aProducer;
199   }
200   
201   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
202     
203     if(properties.containsKey(property)) {
204       return properties.get(property);
205     } else {
206       return defaultValue;
207     }
208   }
209   
210   @Override
211   public void shutdown() {
212     
213     if(publisherPool != null) {
214       publisherPool.shutdown();
215       
216       try {
217         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
218       } catch (InterruptedException e) {}
219     }
220     
221     if(producer != null) {
222       producer.close();
223     }
224   }
225   
226   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
227     
228     ChampObject storedObject = executeStoreObject(object);
229     
230     if(storedObject != null) {
231       
232       // Update the event stream with the current operation.
233       logEvent(ChampEvent.builder()
234                     .operation(ChampOperation.STORE)
235                     .entity(storedObject)
236                     .build());
237     }
238     
239     return storedObject;
240   }
241   
242   
243   public ChampObject replaceObject(ChampObject object)
244       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
245     
246     ChampObject replacedObject = executeReplaceObject(object);
247     
248     if(replacedObject != null) {
249       
250       // Update the event stream with the current operation.
251       logEvent(ChampEvent.builder()
252                     .operation(ChampOperation.REPLACE)
253                     .entity(replacedObject)
254                     .build());
255     }
256     
257     return replacedObject;
258   }
259   
260
261   public void deleteObject(Object key) throws ChampObjectNotExistsException {
262
263     // Retrieve the object that we are deleting before it's gone, so that we can 
264     // report it to the event stream.
265     Optional<ChampObject> objectToDelete = Optional.empty();
266     try {
267       objectToDelete = retrieveObject(key);
268       
269     } catch (ChampUnmarshallingException e) {
270       logger.error("Unable to generate delete object log: " + e.getMessage());
271     }
272     
273     executeDeleteObject(key);
274     
275     if(objectToDelete.isPresent()) {
276       // Update the event stream with the current operation.
277       logEvent(ChampEvent.builder()
278                     .operation(ChampOperation.DELETE)
279                     .entity(objectToDelete.get())
280                     .build());
281     }
282   }
283   
284   
285   public ChampRelationship storeRelationship(ChampRelationship relationship)
286       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
287
288     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
289     
290     if(storedRelationship != null) {
291       
292       // Update the event stream with the current operation.
293       logEvent(ChampEvent.builder()
294                     .operation(ChampOperation.STORE)
295                     .entity(storedRelationship)
296                     .build());
297     }
298     
299     return storedRelationship;
300   }
301   
302   
303   public ChampRelationship replaceRelationship(ChampRelationship relationship)
304       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
305
306     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
307     
308     if(replacedRelationship != null) {
309       
310       // Update the event stream with the current operation.
311       logEvent(ChampEvent.builder()
312                     .operation(ChampOperation.REPLACE)
313                     .entity(replacedRelationship)
314                     .build());
315     }
316     
317     return replacedRelationship;
318   }
319   
320   
321   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
322
323     executeDeleteRelationship(relationship);
324     
325     // Update the event stream with the current operation.
326     logEvent(ChampEvent.builder()
327                   .operation(ChampOperation.DELETE)
328                   .entity(relationship)
329                   .build());
330   }
331   
332   
333   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
334
335     ChampPartition storedPartition = executeStorePartition(partition);
336     
337     if(storedPartition != null) {
338       
339       // Update the event stream with the current operation.
340       logEvent(ChampEvent.builder()
341                     .operation(ChampOperation.STORE)
342                     .entity(storedPartition)
343                     .build());
344     }
345     
346     return storedPartition;
347   }
348   
349   
350   public void deletePartition(ChampPartition graph) {
351
352     executeDeletePartition(graph);
353     
354     // Update the event stream with the current operation.
355     logEvent(ChampEvent.builder()
356                   .operation(ChampOperation.DELETE)
357                   .entity(graph)
358                   .build());
359   }
360   
361   
362   public void storeObjectIndex(ChampObjectIndex index) {
363
364     executeStoreObjectIndex(index);
365     
366     // Update the event stream with the current operation.
367     logEvent(ChampEvent.builder()
368                   .operation(ChampOperation.STORE)
369                   .entity(index)
370                   .build());
371   }
372   
373   
374   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
375     
376     // Retrieve the index that we are deleting before it's gone, so that we can 
377     // report it to the event stream.
378     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
379     
380     executeDeleteObjectIndex(indexName);
381     
382     if(indexToDelete.isPresent()) {
383       // Update the event stream with the current operation.
384       logEvent(ChampEvent.builder()
385                     .operation(ChampOperation.DELETE)
386                     .entity(indexToDelete.get()) 
387                     .build());
388     }
389   }
390   
391   
392   public void storeRelationshipIndex(ChampRelationshipIndex index) {
393
394     executeStoreRelationshipIndex(index);
395     
396     // Update the event stream with the current operation.
397     logEvent(ChampEvent.builder()
398                   .operation(ChampOperation.STORE)
399                   .entity(index) 
400                   .build());
401   }
402   
403   
404   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
405
406     // Retrieve the index that we are deleting before it's gone, so that we can 
407     // report it to the event stream.
408     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
409     
410     executeDeleteRelationshipIndex(indexName);
411     
412     if(indexToDelete.isPresent()) {
413       // Update the event stream with the current operation.
414       logEvent(ChampEvent.builder()
415                     .operation(ChampOperation.DELETE)
416                     .entity(indexToDelete.get()) 
417                     .build());
418     }
419   }
420   
421   
422   /**
423    * Submits an event to be published to the event stream.
424    * 
425    * @param anEvent - The event to be published.
426    */
427   public void logEvent(ChampEvent anEvent) {
428     
429     if(logger.isDebugEnabled()) {
430       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
431     }
432     
433     try {
434       
435       // Try submitting the event to be published to the event bus.
436       publisherPool.execute(new EventPublisher(anEvent));
437     
438     } catch (RejectedExecutionException re) {
439       logger.error("Event could not be published to the event bus due to: " + re.getMessage());
440       
441     } catch (NullPointerException npe) {
442       logger.error("Can not publish null event to event bus.");
443     }
444   }
445   
446   
447   /**
448    * This class runs as a background thread and is responsible for pulling Champ events off
449    * of the internal queue and publishing them to the event stream.
450    */
451   private class EventPublisher implements Runnable {
452     
453     /** Partition key to use when publishing events to the event stream.  We WANT all events
454      *  to go to a single partition, so we are just using a hard-coded key for every event. */
455     private static final String EVENTS_PARTITION_KEY = "champEventKey";
456     
457     private ChampEvent event;
458     
459     public EventPublisher(ChampEvent event) {
460       this.event = event;
461     }
462     
463     
464     @Override
465     public void run() {
466
467       boolean done = false;
468       while(!done && !Thread.currentThread().isInterrupted()) {
469         try {
470           
471           // Make sure that we actually have a producer instance to use to publish
472           // the event with.
473           if(producer != null) {
474             
475             // Try publishing the event to the event bus.
476             producer.send(EVENTS_PARTITION_KEY, event.toJson());
477             
478           } else if (logger.isDebugEnabled()) {            
479             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
480           }
481           
482           done = true;
483           
484         } catch (IOException e) {
485   
486           // We were unable to publish to the event bus, so wait a bit and then try
487           // again.
488           try {
489             Thread.sleep(500);
490             
491           } catch (InterruptedException e1) {
492             logger.info("Stopping event publisher worker thread.");
493             return;
494           }
495         }           
496       }
497     }
498   }
499 }