16fc00b8f05147c36676c30a2732d1c31f666dc0
[aai/champ.git] / src / main / java / org / onap / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.aai.champ.event;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.security.GeneralSecurityException;
26 import java.util.Map;
27 import java.util.Optional;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.RejectedExecutionException;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Stream;
33
34 import org.onap.aai.champ.ChampCapabilities;
35 import org.onap.aai.champ.ChampGraph;
36 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
37 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
38 import org.onap.aai.champ.exceptions.ChampMarshallingException;
39 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
40 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
42 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
43 import org.onap.aai.champ.model.ChampObject;
44 import org.onap.aai.champ.model.ChampObjectConstraint;
45 import org.onap.aai.champ.model.ChampObjectIndex;
46 import org.onap.aai.champ.model.ChampPartition;
47 import org.onap.aai.champ.model.ChampRelationship;
48 import org.onap.aai.champ.model.ChampRelationshipConstraint;
49 import org.onap.aai.champ.model.ChampRelationshipIndex;
50 import org.onap.aai.champ.model.ChampSchema;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import com.att.nsa.cambria.client.CambriaClientBuilders;
55 import com.att.nsa.cambria.client.CambriaPublisher;
56
57
58 /**
59  * This class provides the hooks to allow Champ operations to be logged to an event
60  * stream.
61  */
62 public abstract class AbstractLoggingChampGraph implements ChampGraph {
63
64   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
65   
66   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
67   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
69   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
70   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
71   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
72   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
73   @Override
74   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
75   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
76   @Override
77   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
78   @Override
79   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
80   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
81   public abstract void                             executeDeletePartition(ChampPartition graph);
82   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
83   @Override
84   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
85   @Override
86   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
87   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
88   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
89   @Override
90   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
91   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
92   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
93   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
94   public abstract ChampSchema                      retrieveSchema();
95   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
96   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
97   public abstract void                             deleteSchema();
98   public abstract ChampCapabilities                capabilities();
99
100    
101   /** Configuration property for setting the comma-separated list of servers to use for
102    *  communicating with the event bus. */
103   public static final String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
104   
105   /** Configuration property for setting the number of events that we will try to 'batch' 
106    *  up before sending them to the event bus. */
107   public static final String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
108   public static final Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
109   
110   /** Configuration property for setting the maximum amount of time to wait for a batch of
111    *  outgoing messages to fill up before sending the batch. */
112   public static final String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
113   public static final Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
114   
115   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
116   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
117   
118   /** The event stream topic that we will publish Champ events to. */
119   public static final String EVENT_TOPIC = "champRawEvents";
120     
121   /** Number of events to 'batch up' before actually publishing them to the event bus. */
122   private Integer eventStreamBatchSize;
123   
124   private Integer eventStreamBatchTimeout;
125   
126   private Integer eventStreamPublisherPoolSize;
127   
128   /** Comma-separated list of hosts for connecting to the event bus. */
129   private String  eventStreamHosts = null;
130   
131   /** Client used for publishing messages to the event bus. */
132   protected CambriaPublisher producer;
133
134   /** Pool of worker threads that do the work of publishing the events to the event bus. */
135   protected ThreadPoolExecutor publisherPool;
136   
137   
138   /**
139    * Create a new instance of the AbstractLoggingChampGraph.
140    * 
141    * @param properties - Set of configuration properties for this graph instance.
142    */
143   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
144     
145     // Extract the necessary parameters from the configuration properties.
146     configure(properties);
147       
148     // Create the executor pool that will do the work of publishing events to the event bus.
149     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
150     
151     // Make sure that we are actually provided a list of hosts for connecting to the event
152     // bus before we actually try to do anything useful.
153     if(eventStreamHosts == null) {
154       
155       // We were not supplied a list of event bus hosts, so just bail.
156       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
157       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
158       return;
159     }
160          
161     try {
162         
163       // Instantiate the producer that we will use to publish events to the event stream.
164       setProducer(new CambriaClientBuilders.PublisherBuilder()
165                         .usingHosts(eventStreamHosts)
166                         .onTopic(EVENT_TOPIC)
167                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
168                         .build());
169       
170     } catch (MalformedURLException | GeneralSecurityException e) {
171       
172       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
173       logger.error("NOTE: Champ events will NOT be published to the event stream");
174       producer = null;
175     }
176   }
177
178       
179   /**
180    * Process the configuration properties supplied for this graph instance.
181    * 
182    * @param properties - Configuration parameters.
183    */
184   private void configure(Map<String, Object> properties) {
185     
186     eventStreamBatchSize = 
187         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
188     eventStreamBatchTimeout = 
189         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
190     eventStreamPublisherPoolSize = 
191         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
192     
193     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
194       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
195     } 
196   }
197   
198   public void setProducer(CambriaPublisher aProducer) {
199     
200     producer = aProducer;
201   }
202   
203   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
204     
205     if(properties.containsKey(property)) {
206       return properties.get(property);
207     } else {
208       return defaultValue;
209     }
210   }
211   
212   @Override
213   public void shutdown() {
214     
215     if(publisherPool != null) {
216       publisherPool.shutdown();
217       
218       try {
219         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
220       } catch (InterruptedException e) {}
221     }
222     
223     if(producer != null) {
224       producer.close();
225     }
226   }
227   
228   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
229     
230     ChampObject storedObject = executeStoreObject(object);
231     
232     if(storedObject != null) {
233       
234       // Update the event stream with the current operation.
235       logEvent(ChampEvent.builder()
236                     .operation(ChampOperation.STORE)
237                     .entity(storedObject)
238                     .build());
239     }
240     
241     return storedObject;
242   }
243   
244   
245   public ChampObject replaceObject(ChampObject object)
246       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
247     
248     ChampObject replacedObject = executeReplaceObject(object);
249     
250     if(replacedObject != null) {
251       
252       // Update the event stream with the current operation.
253       logEvent(ChampEvent.builder()
254                     .operation(ChampOperation.REPLACE)
255                     .entity(replacedObject)
256                     .build());
257     }
258     
259     return replacedObject;
260   }
261   
262
263   public void deleteObject(Object key) throws ChampObjectNotExistsException {
264
265     // Retrieve the object that we are deleting before it's gone, so that we can 
266     // report it to the event stream.
267     Optional<ChampObject> objectToDelete = Optional.empty();
268     try {
269       objectToDelete = retrieveObject(key);
270       
271     } catch (ChampUnmarshallingException e) {
272       logger.error("Unable to generate delete object log: " + e.getMessage(),e);
273     }
274     
275     executeDeleteObject(key);
276     
277     if(objectToDelete.isPresent()) {
278       // Update the event stream with the current operation.
279       logEvent(ChampEvent.builder()
280                     .operation(ChampOperation.DELETE)
281                     .entity(objectToDelete.get())
282                     .build());
283     }
284   }
285   
286   
287   public ChampRelationship storeRelationship(ChampRelationship relationship)
288       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
289
290     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
291     
292     if(storedRelationship != null) {
293       
294       // Update the event stream with the current operation.
295       logEvent(ChampEvent.builder()
296                     .operation(ChampOperation.STORE)
297                     .entity(storedRelationship)
298                     .build());
299     }
300     
301     return storedRelationship;
302   }
303   
304   
305   public ChampRelationship replaceRelationship(ChampRelationship relationship)
306       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
307
308     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
309     
310     if(replacedRelationship != null) {
311       
312       // Update the event stream with the current operation.
313       logEvent(ChampEvent.builder()
314                     .operation(ChampOperation.REPLACE)
315                     .entity(replacedRelationship)
316                     .build());
317     }
318     
319     return replacedRelationship;
320   }
321   
322   
323   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
324
325     executeDeleteRelationship(relationship);
326     
327     // Update the event stream with the current operation.
328     logEvent(ChampEvent.builder()
329                   .operation(ChampOperation.DELETE)
330                   .entity(relationship)
331                   .build());
332   }
333   
334   
335   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
336
337     ChampPartition storedPartition = executeStorePartition(partition);
338     
339     if(storedPartition != null) {
340       
341       // Update the event stream with the current operation.
342       logEvent(ChampEvent.builder()
343                     .operation(ChampOperation.STORE)
344                     .entity(storedPartition)
345                     .build());
346     }
347     
348     return storedPartition;
349   }
350   
351   
352   public void deletePartition(ChampPartition graph) {
353
354     executeDeletePartition(graph);
355     
356     // Update the event stream with the current operation.
357     logEvent(ChampEvent.builder()
358                   .operation(ChampOperation.DELETE)
359                   .entity(graph)
360                   .build());
361   }
362   
363   
364   public void storeObjectIndex(ChampObjectIndex index) {
365
366     executeStoreObjectIndex(index);
367     
368     // Update the event stream with the current operation.
369     logEvent(ChampEvent.builder()
370                   .operation(ChampOperation.STORE)
371                   .entity(index)
372                   .build());
373   }
374   
375   
376   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
377     
378     // Retrieve the index that we are deleting before it's gone, so that we can 
379     // report it to the event stream.
380     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
381     
382     executeDeleteObjectIndex(indexName);
383     
384     if(indexToDelete.isPresent()) {
385       // Update the event stream with the current operation.
386       logEvent(ChampEvent.builder()
387                     .operation(ChampOperation.DELETE)
388                     .entity(indexToDelete.get()) 
389                     .build());
390     }
391   }
392   
393   
394   public void storeRelationshipIndex(ChampRelationshipIndex index) {
395
396     executeStoreRelationshipIndex(index);
397     
398     // Update the event stream with the current operation.
399     logEvent(ChampEvent.builder()
400                   .operation(ChampOperation.STORE)
401                   .entity(index) 
402                   .build());
403   }
404   
405   
406   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
407
408     // Retrieve the index that we are deleting before it's gone, so that we can 
409     // report it to the event stream.
410     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
411     
412     executeDeleteRelationshipIndex(indexName);
413     
414     if(indexToDelete.isPresent()) {
415       // Update the event stream with the current operation.
416       logEvent(ChampEvent.builder()
417                     .operation(ChampOperation.DELETE)
418                     .entity(indexToDelete.get()) 
419                     .build());
420     }
421   }
422   
423   
424   /**
425    * Submits an event to be published to the event stream.
426    * 
427    * @param anEvent - The event to be published.
428    */
429   public void logEvent(ChampEvent anEvent) {
430     
431     if(logger.isDebugEnabled()) {
432       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
433     }
434     
435     try {
436       
437       // Try submitting the event to be published to the event bus.
438       publisherPool.execute(new EventPublisher(anEvent));
439     
440     } catch (RejectedExecutionException re) {
441       logger.error("Event could not be published to the event bus due to: " + re.getMessage(),re);
442       
443     } catch (NullPointerException npe) {
444       logger.error("Can not publish null event to event bus." + npe.getMessage(),npe);
445     }
446   }
447   
448   
449   /**
450    * This class runs as a background thread and is responsible for pulling Champ events off
451    * of the internal queue and publishing them to the event stream.
452    */
453   private class EventPublisher implements Runnable {
454     
455     /** Partition key to use when publishing events to the event stream.  We WANT all events
456      *  to go to a single partition, so we are just using a hard-coded key for every event. */
457     private static final String EVENTS_PARTITION_KEY = "champEventKey";
458     
459     private ChampEvent event;
460     
461     public EventPublisher(ChampEvent event) {
462       this.event = event;
463     }
464     
465     
466     @Override
467     public void run() {
468
469       boolean done = false;
470       while(!done && !Thread.currentThread().isInterrupted()) {
471         try {
472           
473           // Make sure that we actually have a producer instance to use to publish
474           // the event with.
475           if(producer != null) {
476             
477             // Try publishing the event to the event bus.
478             producer.send(EVENTS_PARTITION_KEY, event.toJson());
479             
480           } else if (logger.isDebugEnabled()) {            
481             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
482           }
483           
484           done = true;
485           
486         } catch (IOException e) {
487   
488           // We were unable to publish to the event bus, so wait a bit and then try
489           // again.
490           try {
491             Thread.sleep(500);
492             
493           } catch (InterruptedException e1) {
494             logger.info("Stopping event publisher worker thread.");
495             return;
496           }
497         }           
498       }
499     }
500   }
501 }