Add multi-oxm using schemaIngest library
[aai/data-router.git] / src / main / java / org / onap / aai / datarouter / policy / SpikeEntityEventPolicy.java
index 36bb142..8f0e426 100644 (file)
@@ -30,25 +30,27 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.eclipse.persistence.dynamic.DynamicType;
 import org.eclipse.persistence.internal.helper.DatabaseField;
 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.datarouter.entity.SpikeEventEntity;
 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
-import org.onap.aai.datarouter.entity.SpikeEventVertex;
 import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
+import org.onap.aai.datarouter.entity.SpikeEventEntity;
+import org.onap.aai.datarouter.entity.SpikeEventVertex;
 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
 import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
 import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
-import org.onap.aai.datarouter.util.OxmModelLoader;
+import org.onap.aai.datarouter.schema.OxmModelLoader;
 import org.onap.aai.datarouter.util.RouterServiceUtil;
 import org.onap.aai.datarouter.util.SearchServiceAgent;
 import org.onap.aai.restclient.client.Headers;
@@ -68,17 +70,38 @@ public class SpikeEntityEventPolicy implements Processor {
 
   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
 
-
-  private static final String ACTION_CREATE = "create";
-  private static final String EVENT_VERTEX = "vertex";
-  private static final String ACTION_DELETE = "delete";
-  private static final String ACTION_UPDATE = "update";
+  /**
+   * Note (8-June-2018):
+   * 
+   * At present we don't need to support every event-type that could be present in the spike-events.
+   * The only one we want is a SPIKE "update-notification". In the future perhaps we need to add some
+   * configurability to the camel-route itself with a json camel filtering component so that routing
+   * logic can be modified as part of the camel route spring-xml instead of hard-coding the
+   * event filtering in here.
+   */
+  
   private static final String PROCESS_SPIKE_EVENT = "Process Spike Event";
+  
+  private static final String UPDATE_NOTIFICATION = "update-notification";
+  private static final String SPIKE = "SPIKE";
+  
+  private static final String HEADER_KEY = "header";
+  private static final String EVENT_TYPE_KEY = "event-type";
+  private static final String SOURCE_NAME_KEY = "source-name";
+  private static final String BODY_KEY = "body";
   private static final String OPERATION_KEY = "operation";
 
+  private static final String VERTEX_KEY = "vertex";
+  private static final String VERTEX_KEY_KEY = "key";
+  private static final String VERTEX_TYPE_KEY = "type";
+  private static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+  
+  private static final String CREATE = "create";
+  private static final String DELETE = "delete";
+  private static final String UPDATE = "update";
 
   private static final List<String> SUPPORTED_ACTIONS =
-      Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+      Arrays.asList(CREATE, UPDATE, DELETE);
 
   Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
   private String oxmVersion = null;
@@ -122,7 +145,7 @@ public class SpikeEntityEventPolicy implements Processor {
   private void parseLatestOxmVersion() {
     int latestVersion = -1;
     if (oxmVersionContextMap != null) {
-      Iterator it = oxmVersionContextMap.entrySet().iterator();
+      Iterator<Entry<String, DynamicJAXBContext>> it = oxmVersionContextMap.entrySet().iterator();
       while (it.hasNext()) {
         Map.Entry pair = (Map.Entry) it.next();
 
@@ -148,6 +171,7 @@ public class SpikeEntityEventPolicy implements Processor {
     logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
   }
 
+  
 
   /**
    * Convert object to json.
@@ -177,65 +201,109 @@ public class SpikeEntityEventPolicy implements Processor {
     setResponse(exchange, ResponseType.FAILURE, additionalInfo);
   }
 
-  public boolean isJSONValid(String test) {
+  @Override
+  public void process(Exchange exchange) /*throws Exception*/ {
+
+    long startTime = System.currentTimeMillis();
+    
+    final String eventPayload = exchange.getIn().getBody().toString();
+    JSONObject mainJson = null;
+    
     try {
-      new JSONObject(test);
-    } catch (JSONException ex) {
-      return false;
+      
+      /*
+       * It is expected that mainJson will have multiple top level objects:
+       *  - header
+       *  - body
+       *  - result
+       */
+      
+      mainJson = new JSONObject(eventPayload);
+    } catch (JSONException exc) {
+      returnWithError(exchange, eventPayload, "Invalid Payload");
+      return;
     }
-    return true;
-  }
+    
+    JSONObject eventHeader = mainJson.getJSONObject(HEADER_KEY);
 
-  @Override
-  public void process(Exchange exchange) throws Exception {
+    if (eventHeader == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+      return;
+    }
+    
+    /*
+     * Only process SPIKE update-notification events 
+     */
+    
+    final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+    final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
 
-    long startTime = System.currentTimeMillis();
-    String uebPayload = exchange.getIn().getBody().toString();
-    if (uebPayload == null || !isJSONValid(uebPayload)) {
-      uebPayload = exchange.getIn().getBody(String.class);
-      if (uebPayload == null || !isJSONValid(uebPayload)) {
-        returnWithError(exchange, uebPayload, "Invalid Payload");
-        return;
-      }
+    if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+      // drop event
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+          + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+      /*
+       * I don't think ignoring a non-applicable event constitutes a failure.
+       */
+
+      setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+      return;
     }
+    
+    JSONObject eventBody = mainJson.getJSONObject(BODY_KEY);
 
+    if (eventBody == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+      return;
+    }
     
-    JSONObject mainJson = new JSONObject(uebPayload);
-    String action = mainJson.getString(OPERATION_KEY);
+    String action = eventBody.getString(OPERATION_KEY);
     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Unrecognized action '" + action + "'", uebPayload);
+          "Unrecognized action '" + action + "'", eventPayload);
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Unrecognized action '" + action + "'");
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
     }
 
-    // Load the UEB payload data, any errors will result in a failure and discard
-
-    JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
-    if (spikeObjVertex == null) {
-      returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+    // Load the event body data, any errors will result in a failure and discard
+    
+    JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+    if (spikeVertex == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
       return;
     }
 
-    SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
+    SpikeEventVertex eventVertex = null;
+    try {
+      eventVertex = initializeSpikeEventVertex(spikeVertex);
+    } catch (JSONException exc) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Error initializating spike event.  Error: " + exc.getMessage(),
+          eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Error initializating spike event.  Error: " + exc.getMessage());
 
-    DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return;
+    }
+
+    DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
     if (oxmJaxbContext == null) {
       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
-      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", eventPayload);
 
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
     }
 
-
-
     String entityType = eventVertex.getType();
     if (entityType == null || entityType.isEmpty()) {
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Payload header missing entity type", uebPayload);
+          "Payload header missing entity type", eventPayload);
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload header missing entity type");
 
@@ -246,7 +314,7 @@ public class SpikeEntityEventPolicy implements Processor {
     String entityKey = eventVertex.getKey();
     if (entityKey == null || entityKey.isEmpty()) {
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
-          uebPayload);
+          eventPayload);
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload vertex missing entity key");
 
@@ -256,7 +324,7 @@ public class SpikeEntityEventPolicy implements Processor {
     String entityLink = eventVertex.getEntityLink();
     if (entityLink == null || entityLink.isEmpty()) {
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Payload header missing entity link", uebPayload);
+          "Payload header missing entity link", eventPayload);
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload header missing entity link");
 
@@ -267,8 +335,7 @@ public class SpikeEntityEventPolicy implements Processor {
     // log the fact that all data are in good shape
     logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
     logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
-        uebPayload);
-
+        eventPayload);
 
     // Process for building SpikeEventEntity object
     String[] entityTypeArr = entityType.split("-");
@@ -276,7 +343,7 @@ public class SpikeEntityEventPolicy implements Processor {
     for (String entityWord : entityTypeArr) {
       oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1);
     }
-
+    
     List<String> searchableAttr =
         getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
     if (searchableAttr == null) {
@@ -284,28 +351,28 @@ public class SpikeEntityEventPolicy implements Processor {
           "Searchable attribute not found for payload entity type '" + entityType + "'");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
           "Searchable attribute not found for payload entity type '" + entityType + "'",
-          uebPayload);
+          eventPayload);
 
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
     }
 
     String entityPrimaryKeyFieldName =
-        getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+        getEntityPrimaryKeyFieldName(oxmJaxbContext, eventPayload, oxmEntityType, entityType);
     if (entityPrimaryKeyFieldName == null) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload missing primary key attribute");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Payload missing primary key attribute", uebPayload);
+          "Payload missing primary key attribute", eventPayload);
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
     }
-    String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
-    if (entityPrimaryKeyFieldValue.isEmpty()) {
+    String entityPrimaryKeyFieldValue = lookupValueUsingKey(eventPayload, entityPrimaryKeyFieldName);
+    if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload missing primary value attribute");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Payload missing primary value attribute", uebPayload);
+          "Payload missing primary value attribute", eventPayload);
 
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
@@ -321,12 +388,14 @@ public class SpikeEntityEventPolicy implements Processor {
     spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldValue);
     spikeEventEntity.setEntityType(entityType);
     spikeEventEntity.setLink(entityLink);
+    
+    System.out.println(spikeEventEntity);
 
-    if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
+    if (!getSearchTags(spikeEventEntity, searchableAttr, eventPayload, action)) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload missing searchable attribute for entity type '" + entityType + "'");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
-          "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
+          "Payload missing searchable attribute for entity type '" + entityType + "'", eventPayload);
 
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
@@ -338,7 +407,7 @@ public class SpikeEntityEventPolicy implements Processor {
     } catch (NoSuchAlgorithmException e) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
-          uebPayload);
+          eventPayload);
 
       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
       return;
@@ -363,60 +432,19 @@ public class SpikeEntityEventPolicy implements Processor {
     exchange.getOut().setBody(additionalInfo);
   }
 
+  private SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
 
-  /*
-   * Load the UEB JSON payload, any errors would result to a failure case response.
-   */
-  private JSONObject getUebContentAsJson(String payload, String contentKey) {
-
-    JSONObject uebJsonObj;
-    JSONObject uebObjContent;
-
-    try {
-      uebJsonObj = new JSONObject(payload);
-    } catch (JSONException e) {
-      logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
-      logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
-      return null;
-    }
-
-    if (uebJsonObj.has(contentKey)) {
-      uebObjContent = uebJsonObj.getJSONObject(contentKey);
-    } else {
-      logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
-      logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
-      return null;
-    }
-
-    return uebObjContent;
-  }
-
-
-  private SpikeEventVertex initializeSpikeEventVertex(String payload) {
-
-    SpikeEventVertex eventVertex = null;
-    ObjectMapper mapper = new ObjectMapper();
-
-    // Make sure that were were actually passed in a valid string.
-    if (payload == null || payload.isEmpty()) {
-      logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
-      logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+    /*
+     * These are all critical keys
+     */
 
-      return eventVertex;
-    }
+    final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+    final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+    final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
 
-    // Marshal the supplied string into a UebEventHeader object.
-    try {
-      eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
-    } catch (JsonProcessingException e) {
-      logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
-    } catch (Exception e) {
-      logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
-    }
-
-    if (eventVertex != null) {
-      logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
-    }
+    SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+    eventVertex.setSchemaVersion(vertexSchemaVersion);
+    logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
 
     return eventVertex;
 
@@ -453,9 +481,9 @@ public class SpikeEntityEventPolicy implements Processor {
     try {
       jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
     } catch (IOException e) {
-      logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+      logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
           payload);
-      logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+      logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
           "");
     }
 
@@ -498,7 +526,7 @@ public class SpikeEntityEventPolicy implements Processor {
   private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
       String entityType, String fieldName) {
 
-    DynamicType entity = (DynamicType) oxmJaxbContext.getDynamicType(oxmEntityType);
+    DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
     if (entity == null) {
       return null;
     }
@@ -584,8 +612,8 @@ public class SpikeEntityEventPolicy implements Processor {
 
       String entityId = eventEntity.getId();
 
-      if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
-          || action.equalsIgnoreCase(ACTION_UPDATE)) {
+      if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+          || action.equalsIgnoreCase(UPDATE)) {
 
         // Run the GET to retrieve the ETAG from the search service
         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -603,11 +631,11 @@ public class SpikeEntityEventPolicy implements Processor {
         // Write the entity to the search service.
         // PUT
         searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
-      } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+      } else if (action.equalsIgnoreCase(CREATE)) {
         // Write the entry to the search service.
         searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
 
-      } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+      } else if (action.equalsIgnoreCase(DELETE)) {
         // Run the GET to retrieve the ETAG from the search service
         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
 
@@ -619,6 +647,13 @@ public class SpikeEntityEventPolicy implements Processor {
           } else {
             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
           }
+          
+          /*
+           * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+           * we specify a Content-Type.
+           */
+
+          headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
 
           searchAgent.deleteDocument(index, eventEntity.getId(), headers);
         } else {