Add the @Override annotation
[aai/champ.git] / src / main / java / org / onap / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.event;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
34
35 import org.onap.aai.champ.ChampCapabilities;
36 import org.onap.aai.champ.ChampGraph;
37 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
38 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champ.exceptions.ChampMarshallingException;
40 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.onap.aai.champ.model.ChampObject;
45 import org.onap.aai.champ.model.ChampObjectConstraint;
46 import org.onap.aai.champ.model.ChampObjectIndex;
47 import org.onap.aai.champ.model.ChampPartition;
48 import org.onap.aai.champ.model.ChampRelationship;
49 import org.onap.aai.champ.model.ChampRelationshipConstraint;
50 import org.onap.aai.champ.model.ChampRelationshipIndex;
51 import org.onap.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
57
58
59 /**
60  * This class provides the hooks to allow Champ operations to be logged to an event
61  * stream.
62  */
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
64
65   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
66   
67   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
70   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
72   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
73   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
74   @Override
75   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
76   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
77   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
78   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
79   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
80   public abstract void                             executeDeletePartition(ChampPartition graph);
81   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
82   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
83   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
84   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
85   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
86   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
87   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
88   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
89   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
90   public abstract ChampSchema                      retrieveSchema();
91   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
92   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
93   public abstract void                             deleteSchema();
94   public abstract ChampCapabilities                capabilities();
95
96    
97   /** Configuration property for setting the comma-separated list of servers to use for
98    *  communicating with the event bus. */
99   public final static String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
100   
101   /** Configuration property for setting the number of events that we will try to 'batch' 
102    *  up before sending them to the event bus. */
103   public final static String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
104   public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
105   
106   /** Configuration property for setting the maximum amount of time to wait for a batch of
107    *  outgoing messages to fill up before sending the batch. */
108   public final static String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
109   public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
110   
111   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
112   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
113   
114   /** The event stream topic that we will publish Champ events to. */
115   public final static String EVENT_TOPIC = "champRawEvents";
116     
117   /** Number of events to 'batch up' before actually publishing them to the event bus. */
118   private Integer eventStreamBatchSize;
119   
120   private Integer eventStreamBatchTimeout;
121   
122   private Integer eventStreamPublisherPoolSize;
123   
124   /** Comma-separated list of hosts for connecting to the event bus. */
125   private String  eventStreamHosts = null;
126   
127   /** Client used for publishing messages to the event bus. */
128   protected CambriaPublisher producer;
129
130   /** Pool of worker threads that do the work of publishing the events to the event bus. */
131   protected ThreadPoolExecutor publisherPool;
132   
133   
134   /**
135    * Create a new instance of the AbstractLoggingChampGraph.
136    * 
137    * @param properties - Set of configuration properties for this graph instance.
138    */
139   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
140     
141     // Extract the necessary parameters from the configuration properties.
142     configure(properties);
143       
144     // Create the executor pool that will do the work of publishing events to the event bus.
145     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
146     
147     // Make sure that we are actually provided a list of hosts for connecting to the event
148     // bus before we actually try to do anything useful.
149     if(eventStreamHosts == null) {
150       
151       // We were not supplied a list of event bus hosts, so just bail.
152       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
153       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
154       return;
155     }
156          
157     try {
158         
159       // Instantiate the producer that we will use to publish events to the event stream.
160       setProducer(new CambriaClientBuilders.PublisherBuilder()
161                         .usingHosts(eventStreamHosts)
162                         .onTopic(EVENT_TOPIC)
163                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
164                         .build());
165       
166     } catch (MalformedURLException | GeneralSecurityException e) {
167       
168       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
169       logger.error("NOTE: Champ events will NOT be published to the event stream");
170       producer = null;
171     }
172   }
173
174       
175   /**
176    * Process the configuration properties supplied for this graph instance.
177    * 
178    * @param properties - Configuration parameters.
179    */
180   private void configure(Map<String, Object> properties) {
181     
182     eventStreamBatchSize = 
183         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
184     eventStreamBatchTimeout = 
185         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
186     eventStreamPublisherPoolSize = 
187         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
188     
189     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
190       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
191     } 
192   }
193   
194   public void setProducer(CambriaPublisher aProducer) {
195     
196     producer = aProducer;
197   }
198   
199   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
200     
201     if(properties.containsKey(property)) {
202       return properties.get(property);
203     } else {
204       return defaultValue;
205     }
206   }
207   
208   @Override
209   public void shutdown() {
210     
211     if(publisherPool != null) {
212       publisherPool.shutdown();
213       
214       try {
215         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
216       } catch (InterruptedException e) {}
217     }
218     
219     if(producer != null) {
220       producer.close();
221     }
222   }
223   
224   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
225     
226     ChampObject storedObject = executeStoreObject(object);
227     
228     if(storedObject != null) {
229       
230       // Update the event stream with the current operation.
231       logEvent(ChampEvent.builder()
232                     .operation(ChampOperation.STORE)
233                     .entity(storedObject)
234                     .build());
235     }
236     
237     return storedObject;
238   }
239   
240   
241   public ChampObject replaceObject(ChampObject object)
242       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
243     
244     ChampObject replacedObject = executeReplaceObject(object);
245     
246     if(replacedObject != null) {
247       
248       // Update the event stream with the current operation.
249       logEvent(ChampEvent.builder()
250                     .operation(ChampOperation.REPLACE)
251                     .entity(replacedObject)
252                     .build());
253     }
254     
255     return replacedObject;
256   }
257   
258
259   public void deleteObject(Object key) throws ChampObjectNotExistsException {
260
261     // Retrieve the object that we are deleting before it's gone, so that we can 
262     // report it to the event stream.
263     Optional<ChampObject> objectToDelete = Optional.empty();
264     try {
265       objectToDelete = retrieveObject(key);
266       
267     } catch (ChampUnmarshallingException e) {
268       logger.error("Unable to generate delete object log: " + e.getMessage());
269     }
270     
271     executeDeleteObject(key);
272     
273     if(objectToDelete.isPresent()) {
274       // Update the event stream with the current operation.
275       logEvent(ChampEvent.builder()
276                     .operation(ChampOperation.DELETE)
277                     .entity(objectToDelete.get())
278                     .build());
279     }
280   }
281   
282   
283   public ChampRelationship storeRelationship(ChampRelationship relationship)
284       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
285
286     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
287     
288     if(storedRelationship != null) {
289       
290       // Update the event stream with the current operation.
291       logEvent(ChampEvent.builder()
292                     .operation(ChampOperation.STORE)
293                     .entity(storedRelationship)
294                     .build());
295     }
296     
297     return storedRelationship;
298   }
299   
300   
301   public ChampRelationship replaceRelationship(ChampRelationship relationship)
302       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
303
304     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
305     
306     if(replacedRelationship != null) {
307       
308       // Update the event stream with the current operation.
309       logEvent(ChampEvent.builder()
310                     .operation(ChampOperation.REPLACE)
311                     .entity(replacedRelationship)
312                     .build());
313     }
314     
315     return replacedRelationship;
316   }
317   
318   
319   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
320
321     executeDeleteRelationship(relationship);
322     
323     // Update the event stream with the current operation.
324     logEvent(ChampEvent.builder()
325                   .operation(ChampOperation.DELETE)
326                   .entity(relationship)
327                   .build());
328   }
329   
330   
331   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
332
333     ChampPartition storedPartition = executeStorePartition(partition);
334     
335     if(storedPartition != null) {
336       
337       // Update the event stream with the current operation.
338       logEvent(ChampEvent.builder()
339                     .operation(ChampOperation.STORE)
340                     .entity(storedPartition)
341                     .build());
342     }
343     
344     return storedPartition;
345   }
346   
347   
348   public void deletePartition(ChampPartition graph) {
349
350     executeDeletePartition(graph);
351     
352     // Update the event stream with the current operation.
353     logEvent(ChampEvent.builder()
354                   .operation(ChampOperation.DELETE)
355                   .entity(graph)
356                   .build());
357   }
358   
359   
360   public void storeObjectIndex(ChampObjectIndex index) {
361
362     executeStoreObjectIndex(index);
363     
364     // Update the event stream with the current operation.
365     logEvent(ChampEvent.builder()
366                   .operation(ChampOperation.STORE)
367                   .entity(index)
368                   .build());
369   }
370   
371   
372   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
373     
374     // Retrieve the index that we are deleting before it's gone, so that we can 
375     // report it to the event stream.
376     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
377     
378     executeDeleteObjectIndex(indexName);
379     
380     if(indexToDelete.isPresent()) {
381       // Update the event stream with the current operation.
382       logEvent(ChampEvent.builder()
383                     .operation(ChampOperation.DELETE)
384                     .entity(indexToDelete.get()) 
385                     .build());
386     }
387   }
388   
389   
390   public void storeRelationshipIndex(ChampRelationshipIndex index) {
391
392     executeStoreRelationshipIndex(index);
393     
394     // Update the event stream with the current operation.
395     logEvent(ChampEvent.builder()
396                   .operation(ChampOperation.STORE)
397                   .entity(index) 
398                   .build());
399   }
400   
401   
402   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
403
404     // Retrieve the index that we are deleting before it's gone, so that we can 
405     // report it to the event stream.
406     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
407     
408     executeDeleteRelationshipIndex(indexName);
409     
410     if(indexToDelete.isPresent()) {
411       // Update the event stream with the current operation.
412       logEvent(ChampEvent.builder()
413                     .operation(ChampOperation.DELETE)
414                     .entity(indexToDelete.get()) 
415                     .build());
416     }
417   }
418   
419   
420   /**
421    * Submits an event to be published to the event stream.
422    * 
423    * @param anEvent - The event to be published.
424    */
425   public void logEvent(ChampEvent anEvent) {
426     
427     if(logger.isDebugEnabled()) {
428       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
429     }
430     
431     try {
432       
433       // Try submitting the event to be published to the event bus.
434       publisherPool.execute(new EventPublisher(anEvent));
435     
436     } catch (RejectedExecutionException re) {
437       logger.error("Event could not be published to the event bus due to: " + re.getMessage());
438       
439     } catch (NullPointerException npe) {
440       logger.error("Can not publish null event to event bus.");
441     }
442   }
443   
444   
445   /**
446    * This class runs as a background thread and is responsible for pulling Champ events off
447    * of the internal queue and publishing them to the event stream.
448    */
449   private class EventPublisher implements Runnable {
450     
451     /** Partition key to use when publishing events to the event stream.  We WANT all events
452      *  to go to a single partition, so we are just using a hard-coded key for every event. */
453     private static final String EVENTS_PARTITION_KEY = "champEventKey";
454     
455     private ChampEvent event;
456     
457     public EventPublisher(ChampEvent event) {
458       this.event = event;
459     }
460     
461     
462     @Override
463     public void run() {
464
465       boolean done = false;
466       while(!done && !Thread.currentThread().isInterrupted()) {
467         try {
468           
469           // Make sure that we actually have a producer instance to use to publish
470           // the event with.
471           if(producer != null) {
472             
473             // Try publishing the event to the event bus.
474             producer.send(EVENTS_PARTITION_KEY, event.toJson());
475             
476           } else if (logger.isDebugEnabled()) {            
477             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
478           }
479           
480           done = true;
481           
482         } catch (IOException e) {
483   
484           // We were unable to publish to the event bus, so wait a bit and then try
485           // again.
486           try {
487             Thread.sleep(500);
488             
489           } catch (InterruptedException e1) {
490             logger.info("Stopping event publisher worker thread.");
491             return;
492           }
493         }           
494       }
495     }
496   }
497 }