Merge "Added @Override annotation above signature"
[aai/champ.git] / src / main / java / org / openecomp / aai / champ / event / AbstractLoggingChampGraph.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.openecomp.aai.champ.event;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
34
35 import org.openecomp.aai.champ.ChampCapabilities;
36 import org.openecomp.aai.champ.ChampGraph;
37 import org.openecomp.aai.champ.event.ChampEvent.ChampOperation;
38 import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.openecomp.aai.champ.exceptions.ChampMarshallingException;
40 import org.openecomp.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.openecomp.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.openecomp.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.openecomp.aai.champ.model.ChampObject;
45 import org.openecomp.aai.champ.model.ChampObjectConstraint;
46 import org.openecomp.aai.champ.model.ChampObjectIndex;
47 import org.openecomp.aai.champ.model.ChampPartition;
48 import org.openecomp.aai.champ.model.ChampRelationship;
49 import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
50 import org.openecomp.aai.champ.model.ChampRelationshipIndex;
51 import org.openecomp.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
57
58
59 /**
60  * This class provides the hooks to allow Champ operations to be logged to an event
61  * stream.
62  */
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
64
65   private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
66   
67   public abstract ChampObject                      executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68   public abstract ChampObject                      executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69   public abstract Optional<ChampObject>            retrieveObject(Object key) throws ChampUnmarshallingException;
70   public abstract void                             executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71   public abstract Stream<ChampObject>              queryObjects(Map<String, Object> queryParams);
72   public abstract ChampRelationship                executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;  
73   public abstract ChampRelationship                executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; 
74   public abstract Optional<ChampRelationship>      retrieveRelationship(Object key) throws ChampUnmarshallingException;
75   public abstract void                             executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
76   public abstract Stream<ChampRelationship>        retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
77   public abstract Stream<ChampRelationship>        queryRelationships(Map<String, Object> queryParams);
78   public abstract ChampPartition                   executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
79   public abstract void                             executeDeletePartition(ChampPartition graph);
80   public abstract void                             executeStoreObjectIndex(ChampObjectIndex index);
81   public abstract Optional<ChampObjectIndex>       retrieveObjectIndex(String indexName);
82   public abstract Stream<ChampObjectIndex>         retrieveObjectIndices();
83   public abstract void                             executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
84   public abstract void                             executeStoreRelationshipIndex(ChampRelationshipIndex index);
85   public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
86   public abstract Stream<ChampRelationshipIndex>   retrieveRelationshipIndices();
87   public abstract void                             executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
88   public abstract void                             storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
89   public abstract ChampSchema                      retrieveSchema();
90   public abstract void                             updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
91   public abstract void                             updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
92   public abstract void                             deleteSchema();
93   public abstract ChampCapabilities                capabilities();
94
95    
96   /** Configuration property for setting the comma-separated list of servers to use for
97    *  communicating with the event bus. */
98   public final static String  PARAM_EVENT_STREAM_HOSTS      = "champ.event.stream.hosts";
99   
100   /** Configuration property for setting the number of events that we will try to 'batch' 
101    *  up before sending them to the event bus. */
102   public final static String  PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
103   public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
104   
105   /** Configuration property for setting the maximum amount of time to wait for a batch of
106    *  outgoing messages to fill up before sending the batch. */
107   public final static String  PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
108   public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; 
109   
110   public final static String  PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
111   public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
112   
113   /** The event stream topic that we will publish Champ events to. */
114   public final static String EVENT_TOPIC = "champRawEvents";
115     
116   /** Number of events to 'batch up' before actually publishing them to the event bus. */
117   private Integer eventStreamBatchSize;
118   
119   private Integer eventStreamBatchTimeout;
120   
121   private Integer eventStreamPublisherPoolSize;
122   
123   /** Comma-separated list of hosts for connecting to the event bus. */
124   private String  eventStreamHosts = null;
125   
126   /** Client used for publishing messages to the event bus. */
127   protected CambriaPublisher producer;
128
129   /** Pool of worker threads that do the work of publishing the events to the event bus. */
130   protected ThreadPoolExecutor publisherPool;
131   
132   
133   /**
134    * Create a new instance of the AbstractLoggingChampGraph.
135    * 
136    * @param properties - Set of configuration properties for this graph instance.
137    */
138   protected AbstractLoggingChampGraph(Map<String, Object> properties) {
139     
140     // Extract the necessary parameters from the configuration properties.
141     configure(properties);
142       
143     // Create the executor pool that will do the work of publishing events to the event bus.
144     publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
145     
146     // Make sure that we are actually provided a list of hosts for connecting to the event
147     // bus before we actually try to do anything useful.
148     if(eventStreamHosts == null) {
149       
150       // We were not supplied a list of event bus hosts, so just bail.
151       logger.error("Cannot initialize event stream publisher without at least one event bus host.");
152       logger.error("NOTE!! Champ events will NOT be published to the event stream!");
153       return;
154     }
155          
156     try {
157         
158       // Instantiate the producer that we will use to publish events to the event stream.
159       setProducer(new CambriaClientBuilders.PublisherBuilder()
160                         .usingHosts(eventStreamHosts)
161                         .onTopic(EVENT_TOPIC)
162                         .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
163                         .build());
164       
165     } catch (MalformedURLException | GeneralSecurityException e) {
166       
167       logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
168       logger.error("NOTE: Champ events will NOT be published to the event stream");
169       producer = null;
170     }
171   }
172
173       
174   /**
175    * Process the configuration properties supplied for this graph instance.
176    * 
177    * @param properties - Configuration parameters.
178    */
179   private void configure(Map<String, Object> properties) {
180     
181     eventStreamBatchSize = 
182         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE,    DEFAULT_EVENT_STREAM_BATCH_SIZE);
183     eventStreamBatchTimeout = 
184         (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
185     eventStreamPublisherPoolSize = 
186         (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
187     
188     if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
189       eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
190     } 
191   }
192   
193   public void setProducer(CambriaPublisher aProducer) {
194     
195     producer = aProducer;
196   }
197   
198   private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
199     
200     if(properties.containsKey(property)) {
201       return properties.get(property);
202     } else {
203       return defaultValue;
204     }
205   }
206   
207   @Override
208   public void shutdown() {
209     
210     if(publisherPool != null) {
211       publisherPool.shutdown();
212       
213       try {
214         publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
215       } catch (InterruptedException e) {}
216     }
217     
218     if(producer != null) {
219       producer.close();
220     }
221   }
222   
223   public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
224     
225     ChampObject storedObject = executeStoreObject(object);
226     
227     if(storedObject != null) {
228       
229       // Update the event stream with the current operation.
230       logEvent(ChampEvent.builder()
231                     .operation(ChampOperation.STORE)
232                     .entity(storedObject)
233                     .build());
234     }
235     
236     return storedObject;
237   }
238   
239   
240   public ChampObject replaceObject(ChampObject object)
241       throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
242     
243     ChampObject replacedObject = executeReplaceObject(object);
244     
245     if(replacedObject != null) {
246       
247       // Update the event stream with the current operation.
248       logEvent(ChampEvent.builder()
249                     .operation(ChampOperation.REPLACE)
250                     .entity(replacedObject)
251                     .build());
252     }
253     
254     return replacedObject;
255   }
256   
257
258   public void deleteObject(Object key) throws ChampObjectNotExistsException {
259
260     // Retrieve the object that we are deleting before it's gone, so that we can 
261     // report it to the event stream.
262     Optional<ChampObject> objectToDelete = Optional.empty();
263     try {
264       objectToDelete = retrieveObject(key);
265       
266     } catch (ChampUnmarshallingException e) {
267       logger.error("Unable to generate delete object log: " + e.getMessage());
268     }
269     
270     executeDeleteObject(key);
271     
272     if(objectToDelete.isPresent()) {
273       // Update the event stream with the current operation.
274       logEvent(ChampEvent.builder()
275                     .operation(ChampOperation.DELETE)
276                     .entity(objectToDelete.get())
277                     .build());
278     }
279   }
280   
281   
282   public ChampRelationship storeRelationship(ChampRelationship relationship)
283       throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {  
284
285     ChampRelationship storedRelationship = executeStoreRelationship(relationship);
286     
287     if(storedRelationship != null) {
288       
289       // Update the event stream with the current operation.
290       logEvent(ChampEvent.builder()
291                     .operation(ChampOperation.STORE)
292                     .entity(storedRelationship)
293                     .build());
294     }
295     
296     return storedRelationship;
297   }
298   
299   
300   public ChampRelationship replaceRelationship(ChampRelationship relationship)
301       throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { 
302
303     ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
304     
305     if(replacedRelationship != null) {
306       
307       // Update the event stream with the current operation.
308       logEvent(ChampEvent.builder()
309                     .operation(ChampOperation.REPLACE)
310                     .entity(replacedRelationship)
311                     .build());
312     }
313     
314     return replacedRelationship;
315   }
316   
317   
318   public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
319
320     executeDeleteRelationship(relationship);
321     
322     // Update the event stream with the current operation.
323     logEvent(ChampEvent.builder()
324                   .operation(ChampOperation.DELETE)
325                   .entity(relationship)
326                   .build());
327   }
328   
329   
330   public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
331
332     ChampPartition storedPartition = executeStorePartition(partition);
333     
334     if(storedPartition != null) {
335       
336       // Update the event stream with the current operation.
337       logEvent(ChampEvent.builder()
338                     .operation(ChampOperation.STORE)
339                     .entity(storedPartition)
340                     .build());
341     }
342     
343     return storedPartition;
344   }
345   
346   
347   public void deletePartition(ChampPartition graph) {
348
349     executeDeletePartition(graph);
350     
351     // Update the event stream with the current operation.
352     logEvent(ChampEvent.builder()
353                   .operation(ChampOperation.DELETE)
354                   .entity(graph)
355                   .build());
356   }
357   
358   
359   public void storeObjectIndex(ChampObjectIndex index) {
360
361     executeStoreObjectIndex(index);
362     
363     // Update the event stream with the current operation.
364     logEvent(ChampEvent.builder()
365                   .operation(ChampOperation.STORE)
366                   .entity(index)
367                   .build());
368   }
369   
370   
371   public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
372     
373     // Retrieve the index that we are deleting before it's gone, so that we can 
374     // report it to the event stream.
375     Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
376     
377     executeDeleteObjectIndex(indexName);
378     
379     if(indexToDelete.isPresent()) {
380       // Update the event stream with the current operation.
381       logEvent(ChampEvent.builder()
382                     .operation(ChampOperation.DELETE)
383                     .entity(indexToDelete.get()) 
384                     .build());
385     }
386   }
387   
388   
389   public void storeRelationshipIndex(ChampRelationshipIndex index) {
390
391     executeStoreRelationshipIndex(index);
392     
393     // Update the event stream with the current operation.
394     logEvent(ChampEvent.builder()
395                   .operation(ChampOperation.STORE)
396                   .entity(index) 
397                   .build());
398   }
399   
400   
401   public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
402
403     // Retrieve the index that we are deleting before it's gone, so that we can 
404     // report it to the event stream.
405     Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
406     
407     executeDeleteRelationshipIndex(indexName);
408     
409     if(indexToDelete.isPresent()) {
410       // Update the event stream with the current operation.
411       logEvent(ChampEvent.builder()
412                     .operation(ChampOperation.DELETE)
413                     .entity(indexToDelete.get()) 
414                     .build());
415     }
416   }
417   
418   
419   /**
420    * Submits an event to be published to the event stream.
421    * 
422    * @param anEvent - The event to be published.
423    */
424   public void logEvent(ChampEvent anEvent) {
425     
426     if(logger.isDebugEnabled()) {
427       logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
428     }
429     
430     try {
431       
432       // Try submitting the event to be published to the event bus.
433       publisherPool.execute(new EventPublisher(anEvent));
434     
435     } catch (RejectedExecutionException re) {
436       logger.error("Event could not be published to the event bus due to: " + re.getMessage());
437       
438     } catch (NullPointerException npe) {
439       logger.error("Can not publish null event to event bus.");
440     }
441   }
442   
443   
444   /**
445    * This class runs as a background thread and is responsible for pulling Champ events off
446    * of the internal queue and publishing them to the event stream.
447    */
448   private class EventPublisher implements Runnable {
449     
450     /** Partition key to use when publishing events to the event stream.  We WANT all events
451      *  to go to a single partition, so we are just using a hard-coded key for every event. */
452     private static final String EVENTS_PARTITION_KEY = "champEventKey";
453     
454     private ChampEvent event;
455     
456     public EventPublisher(ChampEvent event) {
457       this.event = event;
458     }
459     
460     
461     @Override
462     public void run() {
463
464       boolean done = false;
465       while(!done && !Thread.currentThread().isInterrupted()) {
466         try {
467           
468           // Make sure that we actually have a producer instance to use to publish
469           // the event with.
470           if(producer != null) {
471             
472             // Try publishing the event to the event bus.
473             producer.send(EVENTS_PARTITION_KEY, event.toJson());
474             
475           } else if (logger.isDebugEnabled()) {            
476             logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
477           }
478           
479           done = true;
480           
481         } catch (IOException e) {
482   
483           // We were unable to publish to the event bus, so wait a bit and then try
484           // again.
485           try {
486             Thread.sleep(500);
487             
488           } catch (InterruptedException e1) {
489             logger.info("Stopping event publisher worker thread.");
490             return;
491           }
492         }           
493       }
494     }
495   }
496 }