/** * ============LICENSE_START========================================== * org.onap.aai * =================================================================== * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * Copyright © 2017 Amdocs * =================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END============================================ * ECOMP is a trademark and service mark of AT&T Intellectual Property. */ package org.openecomp.aai.champ.event; import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.openecomp.aai.champ.ChampCapabilities; import org.openecomp.aai.champ.ChampGraph; import org.openecomp.aai.champ.event.ChampEvent.ChampOperation; import org.openecomp.aai.champ.exceptions.ChampIndexNotExistsException; import org.openecomp.aai.champ.exceptions.ChampMarshallingException; import org.openecomp.aai.champ.exceptions.ChampObjectNotExistsException; import org.openecomp.aai.champ.exceptions.ChampRelationshipNotExistsException; import org.openecomp.aai.champ.exceptions.ChampSchemaViolationException; import org.openecomp.aai.champ.exceptions.ChampUnmarshallingException; import org.openecomp.aai.champ.model.ChampObject; import org.openecomp.aai.champ.model.ChampObjectConstraint; import org.openecomp.aai.champ.model.ChampObjectIndex; import org.openecomp.aai.champ.model.ChampPartition; import org.openecomp.aai.champ.model.ChampRelationship; import org.openecomp.aai.champ.model.ChampRelationshipConstraint; import org.openecomp.aai.champ.model.ChampRelationshipIndex; import org.openecomp.aai.champ.model.ChampSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaPublisher; /** * This class provides the hooks to allow Champ operations to be logged to an event * stream. */ public abstract class AbstractLoggingChampGraph implements ChampGraph { private static final Logger logger = LoggerFactory.getLogger(AbstractLoggingChampGraph.class); public abstract ChampObject executeStoreObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException; public abstract ChampObject executeReplaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException; public abstract Optional retrieveObject(Object key) throws ChampUnmarshallingException; public abstract void executeDeleteObject(Object key) throws ChampObjectNotExistsException; public abstract Stream queryObjects(Map queryParams); public abstract ChampRelationship executeStoreRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException; public abstract ChampRelationship executeReplaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException; public abstract Optional retrieveRelationship(Object key) throws ChampUnmarshallingException; public abstract void executeDeleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException; public abstract Stream retrieveRelationships(ChampObject object) throws ChampUnmarshallingException, ChampObjectNotExistsException; public abstract Stream queryRelationships(Map queryParams); public abstract ChampPartition executeStorePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException; public abstract void executeDeletePartition(ChampPartition graph); public abstract void executeStoreObjectIndex(ChampObjectIndex index); public abstract Optional retrieveObjectIndex(String indexName); public abstract Stream retrieveObjectIndices(); public abstract void executeDeleteObjectIndex(String indexName) throws ChampIndexNotExistsException; public abstract void executeStoreRelationshipIndex(ChampRelationshipIndex index); public abstract Optional retrieveRelationshipIndex(String indexName); public abstract Stream retrieveRelationshipIndices(); public abstract void executeDeleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException; public abstract void storeSchema(ChampSchema schema) throws ChampSchemaViolationException; public abstract ChampSchema retrieveSchema(); public abstract void updateSchema(ChampObjectConstraint objectConstraint) throws ChampSchemaViolationException; public abstract void updateSchema(ChampRelationshipConstraint schema) throws ChampSchemaViolationException; public abstract void deleteSchema(); public abstract ChampCapabilities capabilities(); /** Configuration property for setting the comma-separated list of servers to use for * communicating with the event bus. */ public final static String PARAM_EVENT_STREAM_HOSTS = "champ.event.stream.hosts"; /** Configuration property for setting the number of events that we will try to 'batch' * up before sending them to the event bus. */ public final static String PARAM_EVENT_STREAM_BATCH_SIZE = "champ.event.stream.batch-size"; public final static Integer DEFAULT_EVENT_STREAM_BATCH_SIZE = 1; /** Configuration property for setting the maximum amount of time to wait for a batch of * outgoing messages to fill up before sending the batch. */ public final static String PARAM_EVENT_STREAM_BATCH_TIMEOUT = "champ.event.stream.batch-timeout"; public final static Integer DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS = 500; public final static String PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE = "champ.event.stream.publisher-pool-size"; public final static Integer DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE = 100; /** The event stream topic that we will publish Champ events to. */ public final static String EVENT_TOPIC = "champRawEvents"; /** Number of events to 'batch up' before actually publishing them to the event bus. */ private Integer eventStreamBatchSize; private Integer eventStreamBatchTimeout; private Integer eventStreamPublisherPoolSize; /** Comma-separated list of hosts for connecting to the event bus. */ private String eventStreamHosts = null; /** Client used for publishing messages to the event bus. */ protected CambriaPublisher producer; /** Pool of worker threads that do the work of publishing the events to the event bus. */ protected ThreadPoolExecutor publisherPool; /** * Create a new instance of the AbstractLoggingChampGraph. * * @param properties - Set of configuration properties for this graph instance. */ protected AbstractLoggingChampGraph(Map properties) { // Extract the necessary parameters from the configuration properties. configure(properties); // Create the executor pool that will do the work of publishing events to the event bus. publisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(eventStreamPublisherPoolSize); // Make sure that we are actually provided a list of hosts for connecting to the event // bus before we actually try to do anything useful. if(eventStreamHosts == null) { // We were not supplied a list of event bus hosts, so just bail. logger.error("Cannot initialize event stream publisher without at least one event bus host."); logger.error("NOTE!! Champ events will NOT be published to the event stream!"); return; } try { // Instantiate the producer that we will use to publish events to the event stream. setProducer(new CambriaClientBuilders.PublisherBuilder() .usingHosts(eventStreamHosts) .onTopic(EVENT_TOPIC) .limitBatch(eventStreamBatchSize, eventStreamBatchTimeout) .build()); } catch (MalformedURLException | GeneralSecurityException e) { logger.error("Could not instantiate event stream producer due to: " + e.getMessage()); logger.error("NOTE: Champ events will NOT be published to the event stream"); producer = null; } } /** * Process the configuration properties supplied for this graph instance. * * @param properties - Configuration parameters. */ private void configure(Map properties) { eventStreamBatchSize = (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_SIZE, DEFAULT_EVENT_STREAM_BATCH_SIZE); eventStreamBatchTimeout = (Integer) getProperty(properties, PARAM_EVENT_STREAM_BATCH_TIMEOUT, DEFAULT_EVENT_STREAM_BATCH_TIMEOUT_MS); eventStreamPublisherPoolSize = (Integer) getProperty(properties, PARAM_EVENT_STREAM_PUBLISHER_POOL_SIZE, DEFAULT_EVENT_STREAM_PUBLISHER_POOL_SIZE); if(properties.containsKey(PARAM_EVENT_STREAM_HOSTS)) { eventStreamHosts = (String) properties.get(PARAM_EVENT_STREAM_HOSTS); } } public void setProducer(CambriaPublisher aProducer) { producer = aProducer; } private Object getProperty(Map properties, String property, Object defaultValue) { if(properties.containsKey(property)) { return properties.get(property); } else { return defaultValue; } } @Override public void shutdown() { if(publisherPool != null) { publisherPool.shutdown(); try { publisherPool.awaitTermination(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) {} } if(producer != null) { producer.close(); } } public ChampObject storeObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException { ChampObject storedObject = executeStoreObject(object); if(storedObject != null) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(storedObject) .build()); } return storedObject; } public ChampObject replaceObject(ChampObject object) throws ChampMarshallingException, ChampSchemaViolationException, ChampObjectNotExistsException { ChampObject replacedObject = executeReplaceObject(object); if(replacedObject != null) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.REPLACE) .entity(replacedObject) .build()); } return replacedObject; } public void deleteObject(Object key) throws ChampObjectNotExistsException { // Retrieve the object that we are deleting before it's gone, so that we can // report it to the event stream. Optional objectToDelete = Optional.empty(); try { objectToDelete = retrieveObject(key); } catch (ChampUnmarshallingException e) { logger.error("Unable to generate delete object log: " + e.getMessage()); } executeDeleteObject(key); if(objectToDelete.isPresent()) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) .entity(objectToDelete.get()) .build()); } } public ChampRelationship storeRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampObjectNotExistsException, ChampSchemaViolationException, ChampRelationshipNotExistsException { ChampRelationship storedRelationship = executeStoreRelationship(relationship); if(storedRelationship != null) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(storedRelationship) .build()); } return storedRelationship; } public ChampRelationship replaceRelationship(ChampRelationship relationship) throws ChampUnmarshallingException, ChampMarshallingException, ChampSchemaViolationException, ChampRelationshipNotExistsException { ChampRelationship replacedRelationship = executeReplaceRelationship(relationship); if(replacedRelationship != null) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.REPLACE) .entity(replacedRelationship) .build()); } return replacedRelationship; } public void deleteRelationship(ChampRelationship relationship) throws ChampRelationshipNotExistsException { executeDeleteRelationship(relationship); // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) .entity(relationship) .build()); } public ChampPartition storePartition(ChampPartition partition) throws ChampSchemaViolationException, ChampRelationshipNotExistsException, ChampMarshallingException, ChampObjectNotExistsException { ChampPartition storedPartition = executeStorePartition(partition); if(storedPartition != null) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(storedPartition) .build()); } return storedPartition; } public void deletePartition(ChampPartition graph) { executeDeletePartition(graph); // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) .entity(graph) .build()); } public void storeObjectIndex(ChampObjectIndex index) { executeStoreObjectIndex(index); // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(index) .build()); } public void deleteObjectIndex(String indexName) throws ChampIndexNotExistsException { // Retrieve the index that we are deleting before it's gone, so that we can // report it to the event stream. Optional indexToDelete = retrieveObjectIndex(indexName); executeDeleteObjectIndex(indexName); if(indexToDelete.isPresent()) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) .entity(indexToDelete.get()) .build()); } } public void storeRelationshipIndex(ChampRelationshipIndex index) { executeStoreRelationshipIndex(index); // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.STORE) .entity(index) .build()); } public void deleteRelationshipIndex(String indexName) throws ChampIndexNotExistsException { // Retrieve the index that we are deleting before it's gone, so that we can // report it to the event stream. Optional indexToDelete = retrieveRelationshipIndex(indexName); executeDeleteRelationshipIndex(indexName); if(indexToDelete.isPresent()) { // Update the event stream with the current operation. logEvent(ChampEvent.builder() .operation(ChampOperation.DELETE) .entity(indexToDelete.get()) .build()); } } /** * Submits an event to be published to the event stream. * * @param anEvent - The event to be published. */ public void logEvent(ChampEvent anEvent) { if(logger.isDebugEnabled()) { logger.debug("Submitting event to be published to the event bus: " + anEvent.toString()); } try { // Try submitting the event to be published to the event bus. publisherPool.execute(new EventPublisher(anEvent)); } catch (RejectedExecutionException re) { logger.error("Event could not be published to the event bus due to: " + re.getMessage()); } catch (NullPointerException npe) { logger.error("Can not publish null event to event bus."); } } /** * This class runs as a background thread and is responsible for pulling Champ events off * of the internal queue and publishing them to the event stream. */ private class EventPublisher implements Runnable { /** Partition key to use when publishing events to the event stream. We WANT all events * to go to a single partition, so we are just using a hard-coded key for every event. */ private static final String EVENTS_PARTITION_KEY = "champEventKey"; private ChampEvent event; public EventPublisher(ChampEvent event) { this.event = event; } @Override public void run() { boolean done = false; while(!done && !Thread.currentThread().isInterrupted()) { try { // Make sure that we actually have a producer instance to use to publish // the event with. if(producer != null) { // Try publishing the event to the event bus. producer.send(EVENTS_PARTITION_KEY, event.toJson()); } else if (logger.isDebugEnabled()) { logger.debug("Event bus producer is not instantiated - will not attempt to publish event"); } done = true; } catch (IOException e) { // We were unable to publish to the event bus, so wait a bit and then try // again. try { Thread.sleep(500); } catch (InterruptedException e1) { logger.info("Stopping event publisher worker thread."); return; } } } } } }