2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.openecomp.aai.champ.event;
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
28 import java.util.Optional;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.stream.Stream;
35 import org.openecomp.aai.champ.ChampCapabilities;
36 import org.openecomp.aai.champ.ChampGraph;
37 import org.openecomp.aai.champ.event.ChampEvent.ChampOperation;
38 import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException;
39 import org.openecomp.aai.champ.exceptions.ChampMarshallingException;
40 import org.openecomp.aai.champ.exceptions.ChampObjectNotExistsException;
41 import org.openecomp.aai.champ.exceptions.ChampRelationshipNotExistsException;
42 import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException;
43 import org.openecomp.aai.champ.exceptions.ChampUnmarshallingException;
44 import org.openecomp.aai.champ.model.ChampObject;
45 import org.openecomp.aai.champ.model.ChampObjectConstraint;
46 import org.openecomp.aai.champ.model.ChampObjectIndex;
47 import org.openecomp.aai.champ.model.ChampPartition;
48 import org.openecomp.aai.champ.model.ChampRelationship;
49 import org.openecomp.aai.champ.model.ChampRelationshipConstraint;
50 import org.openecomp.aai.champ.model.ChampRelationshipIndex;
51 import org.openecomp.aai.champ.model.ChampSchema;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 import com.att.nsa.cambria.client.CambriaClientBuilders;
56 import com.att.nsa.cambria.client.CambriaPublisher;
60 * This class provides the hooks to allow Champ operations to be logged to an event
63 public abstract class AbstractLoggingChampGraph implements ChampGraph {
65 private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class);
67 public abstract ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
68 public abstract ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException;
69 public abstract Optional<ChampObject> retrieveObject(Object key) throws ChampUnmarshallingException;
70 public abstract void executeDeleteObject(Object key) throws ChampObjectNotExistsException;
71 public abstract Stream<ChampObject> queryObjects(Map<String, Object> queryParams);
72 public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException;
73 public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException;
74 public abstract Optional<ChampRelationship> retrieveRelationship(Object key) throws ChampUnmarshallingException;
75 public abstract void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException;
76 public abstract Stream<ChampRelationship> retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException;
77 public abstract Stream<ChampRelationship> queryRelationships(Map<String, Object> queryParams);
78 public abstract ChampPartition executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException;
79 public abstract void executeDeletePartition(ChampPartition graph);
80 public abstract void executeStoreObjectIndex(ChampObjectIndex index);
81 public abstract Optional<ChampObjectIndex> retrieveObjectIndex(String indexName);
82 public abstract Stream<ChampObjectIndex> retrieveObjectIndices();
83 public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException;
84 public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index);
85 public abstract Optional<ChampRelationshipIndex> retrieveRelationshipIndex(String indexName);
86 public abstract Stream<ChampRelationshipIndex> retrieveRelationshipIndices();
87 public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException;
88 public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException;
89 public abstract ChampSchema retrieveSchema();
90 public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException;
91 public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException;
92 public abstract void deleteSchema();
93 public abstract ChampCapabilities capabilities();
96 /** Configuration property for setting the comma-separated list of servers to use for
97 * communicating with the event bus. */
98 public final static String PARAM_EVENT_STREAM_HOSTS = "champ.event.stream.hosts";
100 /** Configuration property for setting the number of events that we will try to 'batch'
101 * up before sending them to the event bus. */
102 public final static String PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size";
103 public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1;
105 /** Configuration property for setting the maximum amount of time to wait for a batch of
106 * outgoing messages to fill up before sending the batch. */
107 public final static String PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout";
108 public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500;
110 public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size";
111 public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100;
113 /** The event stream topic that we will publish Champ events to. */
114 public final static String EVENT_TOPIC = "champRawEvents";
116 /** Number of events to 'batch up' before actually publishing them to the event bus. */
117 private Integer eventStreamBatchSize;
119 private Integer eventStreamBatchTimeout;
121 private Integer eventStreamPublisherPoolSize;
123 /** Comma-separated list of hosts for connecting to the event bus. */
124 private String eventStreamHosts = null;
126 /** Client used for publishing messages to the event bus. */
127 protected CambriaPublisher producer;
129 /** Pool of worker threads that do the work of publishing the events to the event bus. */
130 protected ThreadPoolExecutor publisherPool;
134 * Create a new instance of the AbstractLoggingChampGraph.
136 * @param properties - Set of configuration properties for this graph instance.
138 protected AbstractLoggingChampGraph(Map<String, Object> properties) {
140 // Extract the necessary parameters from the configuration properties.
141 configure(properties);
143 // Create the executor pool that will do the work of publishing events to the event bus.
144 publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize);
146 // Make sure that we are actually provided a list of hosts for connecting to the event
147 // bus before we actually try to do anything useful.
148 if(eventStreamHosts == null) {
150 // We were not supplied a list of event bus hosts, so just bail.
151 logger.error("Cannot initialize event stream publisher without at least one event bus host.");
152 logger.error("NOTE!! Champ events will NOT be published to the event stream!");
158 // Instantiate the producer that we will use to publish events to the event stream.
159 setProducer(new CambriaClientBuilders.PublisherBuilder()
160 .usingHosts(eventStreamHosts)
161 .onTopic(EVENT_TOPIC)
162 .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout)
165 } catch (MalformedURLException | GeneralSecurityException e) {
167 logger.error("Could not instantiate event stream producer due to: " + e.getMessage());
168 logger.error("NOTE: Champ events will NOT be published to the event stream");
175 * Process the configuration properties supplied for this graph instance.
177 * @param properties - Configuration parameters.
179 private void configure(Map<String, Object> properties) {
181 eventStreamBatchSize =
182 (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE, DEFAULT_EVENT_STREAM_BATCH_SIZE);
183 eventStreamBatchTimeout =
184 (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS);
185 eventStreamPublisherPoolSize =
186 (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE);
188 if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) {
189 eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS);
193 public void setProducer(CambriaPublisher aProducer) {
195 producer = aProducer;
198 private Object getProperty(Map<String, Object> properties, String property, Object defaultValue) {
200 if(properties.containsKey(property)) {
201 return properties.get(property);
208 public void shutdown() {
210 if(publisherPool != null) {
211 publisherPool.shutdown();
214 publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
215 } catch (InterruptedException e) {}
218 if(producer != null) {
223 public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
225 ChampObject storedObject = executeStoreObject(object);
227 if(storedObject != null) {
229 // Update the event stream with the current operation.
230 logEvent(ChampEvent.builder()
231 .operation(ChampOperation.STORE)
232 .entity(storedObject)
240 public ChampObject replaceObject(ChampObject object)
241 throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException {
243 ChampObject replacedObject = executeReplaceObject(object);
245 if(replacedObject != null) {
247 // Update the event stream with the current operation.
248 logEvent(ChampEvent.builder()
249 .operation(ChampOperation.REPLACE)
250 .entity(replacedObject)
254 return replacedObject;
258 public void deleteObject(Object key) throws ChampObjectNotExistsException {
260 // Retrieve the object that we are deleting before it's gone, so that we can
261 // report it to the event stream.
262 Optional<ChampObject> objectToDelete = Optional.empty();
264 objectToDelete = retrieveObject(key);
266 } catch (ChampUnmarshallingException e) {
267 logger.error("Unable to generate delete object log: " + e.getMessage());
270 executeDeleteObject(key);
272 if(objectToDelete.isPresent()) {
273 // Update the event stream with the current operation.
274 logEvent(ChampEvent.builder()
275 .operation(ChampOperation.DELETE)
276 .entity(objectToDelete.get())
282 public ChampRelationship storeRelationship(ChampRelationship relationship)
283 throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException {
285 ChampRelationship storedRelationship = executeStoreRelationship(relationship);
287 if(storedRelationship != null) {
289 // Update the event stream with the current operation.
290 logEvent(ChampEvent.builder()
291 .operation(ChampOperation.STORE)
292 .entity(storedRelationship)
296 return storedRelationship;
300 public ChampRelationship replaceRelationship(ChampRelationship relationship)
301 throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException {
303 ChampRelationship replacedRelationship = executeReplaceRelationship(relationship);
305 if(replacedRelationship != null) {
307 // Update the event stream with the current operation.
308 logEvent(ChampEvent.builder()
309 .operation(ChampOperation.REPLACE)
310 .entity(replacedRelationship)
314 return replacedRelationship;
318 public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException {
320 executeDeleteRelationship(relationship);
322 // Update the event stream with the current operation.
323 logEvent(ChampEvent.builder()
324 .operation(ChampOperation.DELETE)
325 .entity(relationship)
330 public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException {
332 ChampPartition storedPartition = executeStorePartition(partition);
334 if(storedPartition != null) {
336 // Update the event stream with the current operation.
337 logEvent(ChampEvent.builder()
338 .operation(ChampOperation.STORE)
339 .entity(storedPartition)
343 return storedPartition;
347 public void deletePartition(ChampPartition graph) {
349 executeDeletePartition(graph);
351 // Update the event stream with the current operation.
352 logEvent(ChampEvent.builder()
353 .operation(ChampOperation.DELETE)
359 public void storeObjectIndex(ChampObjectIndex index) {
361 executeStoreObjectIndex(index);
363 // Update the event stream with the current operation.
364 logEvent(ChampEvent.builder()
365 .operation(ChampOperation.STORE)
371 public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException {
373 // Retrieve the index that we are deleting before it's gone, so that we can
374 // report it to the event stream.
375 Optional<ChampObjectIndex> indexToDelete = retrieveObjectIndex(indexName);
377 executeDeleteObjectIndex(indexName);
379 if(indexToDelete.isPresent()) {
380 // Update the event stream with the current operation.
381 logEvent(ChampEvent.builder()
382 .operation(ChampOperation.DELETE)
383 .entity(indexToDelete.get())
389 public void storeRelationshipIndex(ChampRelationshipIndex index) {
391 executeStoreRelationshipIndex(index);
393 // Update the event stream with the current operation.
394 logEvent(ChampEvent.builder()
395 .operation(ChampOperation.STORE)
401 public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException {
403 // Retrieve the index that we are deleting before it's gone, so that we can
404 // report it to the event stream.
405 Optional<ChampRelationshipIndex> indexToDelete = retrieveRelationshipIndex(indexName);
407 executeDeleteRelationshipIndex(indexName);
409 if(indexToDelete.isPresent()) {
410 // Update the event stream with the current operation.
411 logEvent(ChampEvent.builder()
412 .operation(ChampOperation.DELETE)
413 .entity(indexToDelete.get())
420 * Submits an event to be published to the event stream.
422 * @param anEvent - The event to be published.
424 public void logEvent(ChampEvent anEvent) {
426 if(logger.isDebugEnabled()) {
427 logger.debug("Submitting event to be published to the event bus: " + anEvent.toString());
432 // Try submitting the event to be published to the event bus.
433 publisherPool.execute(new EventPublisher(anEvent));
435 } catch (RejectedExecutionException re) {
436 logger.error("Event could not be published to the event bus due to: " + re.getMessage());
438 } catch (NullPointerException npe) {
439 logger.error("Can not publish null event to event bus.");
445 * This class runs as a background thread and is responsible for pulling Champ events off
446 * of the internal queue and publishing them to the event stream.
448 private class EventPublisher implements Runnable {
450 /** Partition key to use when publishing events to the event stream. We WANT all events
451 * to go to a single partition, so we are just using a hard-coded key for every event. */
452 private static final String EVENTS_PARTITION_KEY = "champEventKey";
454 private ChampEvent event;
456 public EventPublisher(ChampEvent event) {
464 boolean done = false;
465 while(!done && !Thread.currentThread().isInterrupted()) {
468 // Make sure that we actually have a producer instance to use to publish
470 if(producer != null) {
472 // Try publishing the event to the event bus.
473 producer.send(EVENTS_PARTITION_KEY, event.toJson());
475 } else if (logger.isDebugEnabled()) {
476 logger.debug("Event bus producer is not instantiated - will not attempt to publish event");
481 } catch (IOException e) {
483 // We were unable to publish to the event bus, so wait a bit and then try
488 } catch (InterruptedException e1) {
489 logger.info("Stopping event publisher worker thread.");