312a6e4cf4fe899c79fcf1e4ed180c0c3b818f26
[aai/champ.git] / src / main / java / org / onap / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.aai.champ.event;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
34
35 import org.onap.aai.champ.ChampCapabilities;
36 import org.onap.aai.champ.ChampGraph;
37 import org.onap.aai.champ.event.ChampEvent.ChampOperation;
38 import org.onap.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.onap.aai.champ.exceptions.ChampMarshallingException;
40 import org.onap.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.onap.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.onap.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.onap.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.onap.aai.champ.model.ChampObject;
45 import org.onap.aai.champ.model.ChampObjectConstraint;
46 import org.onap.aai.champ.model.ChampObjectIndex;
47 import org.onap.aai.champ.model.ChampPartition;
48 import org.onap.aai.champ.model.ChampRelationship;
49 import org.onap.aai.champ.model.ChampRelationshipConstraint;
50 import org.onap.aai.champ.model.ChampRelationshipIndex;
51 import org.onap.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
57
58
59 /**
60  * This class provides the hooks to allow Champ operations to be logged to an event
61  * stream.
62  */
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
64
65   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
66   
67   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
70   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
72   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
73   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
74   @Override
75   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
76   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
77   @Override
78   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
79   @Override
80   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
81   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
82   public abstract void                             executeDeletePartition(ChampPartition graph);
83   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
84   @Override
85   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
86   @Override
87   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
88   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
89   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
90   @Override
91   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
92   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
93   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
94   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
95   public abstract ChampSchema                      retrieveSchema();
96   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
97   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
98   public abstract void                             deleteSchema();
99   public abstract ChampCapabilities                capabilities();
100
101    
102   /** Configuration property for setting the comma-separated list of servers to use for
103    *  communicating with the event bus. */
104   public static final String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
105   
106   /** Configuration property for setting the number of events that we will try to 'batch' 
107    *  up before sending them to the event bus. */
108   public static final String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
109   public static final Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
110   
111   /** Configuration property for setting the maximum amount of time to wait for a batch of
112    *  outgoing messages to fill up before sending the batch. */
113   public static final String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
114   public static final Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
115   
116   public static final String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
117   public static final Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
118   
119   /** The event stream topic that we will publish Champ events to. */
120   public static final String EVENT_TOPIC = "champRawEvents";
121     
122   /** Number of events to 'batch up' before actually publishing them to the event bus. */
123   private Integer eventStreamBatchSize;
124   
125   private Integer eventStreamBatchTimeout;
126   
127   private Integer eventStreamPublisherPoolSize;
128   
129   /** Comma-separated list of hosts for connecting to the event bus. */
130   private String  eventStreamHosts = null;
131   
132   /** Client used for publishing messages to the event bus. */
133   protected CambriaPublisher producer;
134
135   /** Pool of worker threads that do the work of publishing the events to the event bus. */
136   protected ThreadPoolExecutor publisherPool;
137   
138   
139   /**
140    * Create a new instance of the AbstractLoggingChampGraph.
141    * 
142    * @param properties - Set of configuration properties for this graph instance.
143    */
144   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
145     
146     // Extract the necessary parameters from the configuration properties.
147     configure(properties);
148       
149     // Create the executor pool that will do the work of publishing events to the event bus.
150     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
151     
152     // Make sure that we are actually provided a list of hosts for connecting to the event
153     // bus before we actually try to do anything useful.
154     if(eventStreamHosts == null) {
155       
156       // We were not supplied a list of event bus hosts, so just bail.
157       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
158       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
159       return;
160     }
161          
162     try {
163         
164       // Instantiate the producer that we will use to publish events to the event stream.
165       setProducer(new CambriaClientBuilders.PublisherBuilder()
166                         .usingHosts(eventStreamHosts)
167                         .onTopic(EVENT_TOPIC)
168                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
169                         .build());
170       
171     } catch (MalformedURLException | GeneralSecurityException e) {
172       
173       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
174       logger.error("NOTE: Champ events will NOT be published to the event stream");
175       producer = null;
176     }
177   }
178
179       
180   /**
181    * Process the configuration properties supplied for this graph instance.
182    * 
183    * @param properties - Configuration parameters.
184    */
185   private void configure(Map<String, Object> properties) {
186     
187     eventStreamBatchSize = 
188         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
189     eventStreamBatchTimeout = 
190         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
191     eventStreamPublisherPoolSize = 
192         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
193     
194     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
195       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
196     } 
197   }
198   
199   public void setProducer(CambriaPublisher aProducer) {
200     
201     producer = aProducer;
202   }
203   
204   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
205     
206     if(properties.containsKey(property)) {
207       return properties.get(property);
208     } else {
209       return defaultValue;
210     }
211   }
212   
213   @Override
214   public void shutdown() {
215     
216     if(publisherPool != null) {
217       publisherPool.shutdown();
218       
219       try {
220         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
221       } catch (InterruptedException e) {}
222     }
223     
224     if(producer != null) {
225       producer.close();
226     }
227   }
228   
229   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
230     
231     ChampObject storedObject = executeStoreObject(object);
232     
233     if(storedObject != null) {
234       
235       // Update the event stream with the current operation.
236       logEvent(ChampEvent.builder()
237                     .operation(ChampOperation.STORE)
238                     .entity(storedObject)
239                     .build());
240     }
241     
242     return storedObject;
243   }
244   
245   
246   public ChampObject replaceObject(ChampObject object)
247       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
248     
249     ChampObject replacedObject = executeReplaceObject(object);
250     
251     if(replacedObject != null) {
252       
253       // Update the event stream with the current operation.
254       logEvent(ChampEvent.builder()
255                     .operation(ChampOperation.REPLACE)
256                     .entity(replacedObject)
257                     .build());
258     }
259     
260     return replacedObject;
261   }
262   
263
264   public void deleteObject(Object key) throws ChampObjectNotExistsException {
265
266     // Retrieve the object that we are deleting before it's gone, so that we can 
267     // report it to the event stream.
268     Optional<ChampObject> objectToDelete = Optional.empty();
269     try {
270       objectToDelete = retrieveObject(key);
271       
272     } catch (ChampUnmarshallingException e) {
273       logger.error("Unable to generate delete object log: " + e.getMessage(),e);
274     }
275     
276     executeDeleteObject(key);
277     
278     if(objectToDelete.isPresent()) {
279       // Update the event stream with the current operation.
280       logEvent(ChampEvent.builder()
281                     .operation(ChampOperation.DELETE)
282                     .entity(objectToDelete.get())
283                     .build());
284     }
285   }
286   
287   
288   public ChampRelationship storeRelationship(ChampRelationship relationship)
289       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
290
291     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
292     
293     if(storedRelationship != null) {
294       
295       // Update the event stream with the current operation.
296       logEvent(ChampEvent.builder()
297                     .operation(ChampOperation.STORE)
298                     .entity(storedRelationship)
299                     .build());
300     }
301     
302     return storedRelationship;
303   }
304   
305   
306   public ChampRelationship replaceRelationship(ChampRelationship relationship)
307       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
308
309     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
310     
311     if(replacedRelationship != null) {
312       
313       // Update the event stream with the current operation.
314       logEvent(ChampEvent.builder()
315                     .operation(ChampOperation.REPLACE)
316                     .entity(replacedRelationship)
317                     .build());
318     }
319     
320     return replacedRelationship;
321   }
322   
323   
324   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
325
326     executeDeleteRelationship(relationship);
327     
328     // Update the event stream with the current operation.
329     logEvent(ChampEvent.builder()
330                   .operation(ChampOperation.DELETE)
331                   .entity(relationship)
332                   .build());
333   }
334   
335   
336   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
337
338     ChampPartition storedPartition = executeStorePartition(partition);
339     
340     if(storedPartition != null) {
341       
342       // Update the event stream with the current operation.
343       logEvent(ChampEvent.builder()
344                     .operation(ChampOperation.STORE)
345                     .entity(storedPartition)
346                     .build());
347     }
348     
349     return storedPartition;
350   }
351   
352   
353   public void deletePartition(ChampPartition graph) {
354
355     executeDeletePartition(graph);
356     
357     // Update the event stream with the current operation.
358     logEvent(ChampEvent.builder()
359                   .operation(ChampOperation.DELETE)
360                   .entity(graph)
361                   .build());
362   }
363   
364   
365   public void storeObjectIndex(ChampObjectIndex index) {
366
367     executeStoreObjectIndex(index);
368     
369     // Update the event stream with the current operation.
370     logEvent(ChampEvent.builder()
371                   .operation(ChampOperation.STORE)
372                   .entity(index)
373                   .build());
374   }
375   
376   
377   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
378     
379     // Retrieve the index that we are deleting before it's gone, so that we can 
380     // report it to the event stream.
381     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
382     
383     executeDeleteObjectIndex(indexName);
384     
385     if(indexToDelete.isPresent()) {
386       // Update the event stream with the current operation.
387       logEvent(ChampEvent.builder()
388                     .operation(ChampOperation.DELETE)
389                     .entity(indexToDelete.get()) 
390                     .build());
391     }
392   }
393   
394   
395   public void storeRelationshipIndex(ChampRelationshipIndex index) {
396
397     executeStoreRelationshipIndex(index);
398     
399     // Update the event stream with the current operation.
400     logEvent(ChampEvent.builder()
401                   .operation(ChampOperation.STORE)
402                   .entity(index) 
403                   .build());
404   }
405   
406   
407   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
408
409     // Retrieve the index that we are deleting before it's gone, so that we can 
410     // report it to the event stream.
411     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
412     
413     executeDeleteRelationshipIndex(indexName);
414     
415     if(indexToDelete.isPresent()) {
416       // Update the event stream with the current operation.
417       logEvent(ChampEvent.builder()
418                     .operation(ChampOperation.DELETE)
419                     .entity(indexToDelete.get()) 
420                     .build());
421     }
422   }
423   
424   
425   /**
426    * Submits an event to be published to the event stream.
427    * 
428    * @param anEvent - The event to be published.
429    */
430   public void logEvent(ChampEvent anEvent) {
431     
432     if(logger.isDebugEnabled()) {
433       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
434     }
435     
436     try {
437       
438       // Try submitting the event to be published to the event bus.
439       publisherPool.execute(new EventPublisher(anEvent));
440     
441     } catch (RejectedExecutionException re) {
442       logger.error("Event could not be published to the event bus due to: " + re.getMessage(),re);
443       
444     } catch (NullPointerException npe) {
445       logger.error("Can not publish null event to event bus." + npe.getMessage(),npe);
446     }
447   }
448   
449   
450   /**
451    * This class runs as a background thread and is responsible for pulling Champ events off
452    * of the internal queue and publishing them to the event stream.
453    */
454   private class EventPublisher implements Runnable {
455     
456     /** Partition key to use when publishing events to the event stream.  We WANT all events
457      *  to go to a single partition, so we are just using a hard-coded key for every event. */
458     private static final String EVENTS_PARTITION_KEY = "champEventKey";
459     
460     private ChampEvent event;
461     
462     public EventPublisher(ChampEvent event) {
463       this.event = event;
464     }
465     
466     
467     @Override
468     public void run() {
469
470       boolean done = false;
471       while(!done && !Thread.currentThread().isInterrupted()) {
472         try {
473           
474           // Make sure that we actually have a producer instance to use to publish
475           // the event with.
476           if(producer != null) {
477             
478             // Try publishing the event to the event bus.
479             producer.send(EVENTS_PARTITION_KEY, event.toJson());
480             
481           } else if (logger.isDebugEnabled()) {            
482             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
483           }
484           
485           done = true;
486           
487         } catch (IOException e) {
488   
489           // We were unable to publish to the event bus, so wait a bit and then try
490           // again.
491           try {
492             Thread.sleep(500);
493             
494           } catch (InterruptedException e1) {
495             logger.info("Stopping event publisher worker thread.");
496             return;
497           }
498         }           
499       }
500     }
501   }
502 }