Fix aggregate-vnf search index population
[aai/data-router.git] / src / main / java / org / onap / aai / datarouter / policy / AbstractSpikeEntityEventProcessor.java
index 9627365..50d344b 100644 (file)
@@ -26,6 +26,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -36,21 +37,25 @@ import org.apache.camel.Processor;
 import org.eclipse.persistence.dynamic.DynamicType;
 import org.eclipse.persistence.internal.helper.DatabaseField;
 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.schema.OxmModelLoader;
 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
-import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
 import org.onap.aai.datarouter.entity.SpikeEventEntity;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
 import org.onap.aai.datarouter.entity.SpikeEventVertex;
 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
-import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
-import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
-import org.onap.aai.datarouter.util.OxmModelLoader;
 import org.onap.aai.datarouter.util.RouterServiceUtil;
 import org.onap.aai.datarouter.util.SearchServiceAgent;
+import org.onap.aai.entity.OxmEntityDescriptor;
+import org.onap.aai.util.EntityOxmReferenceHelper;
+import org.onap.aai.util.ExternalOxmModelProcessor;
+import org.onap.aai.util.Version;
+import org.onap.aai.util.VersionedOxmEntities;
 import org.onap.aai.restclient.client.Headers;
 import org.onap.aai.restclient.client.OperationResult;
 import org.onap.aai.restclient.rest.HttpUtil;
@@ -59,30 +64,41 @@ import org.slf4j.MDC;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
 
 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
 
   protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
 
-
-  protected final String ACTION_CREATE = "create";
-  private final String EVENT_VERTEX = "vertex";
-  public final static String ACTION_DELETE = "delete";
-  protected final String ACTION_UPDATE = "update";
   protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
-  private final String OPERATION_KEY = "operation";
 
+  protected static final String UPDATE_NOTIFICATION = "update-notification";
+  protected static final String SPIKE = "SPIKE";
+  
+  protected static final String HEADER_KEY = "header";
+  protected static final String EVENT_TYPE_KEY = "event-type";
+  protected static final String SOURCE_NAME_KEY = "source-name";
+  protected static final String BODY_KEY = "body";
+  protected static final String OPERATION_KEY = "operation";
+  
+  protected static final String VERTEX_KEY = "vertex";
+  protected static final String VERTEX_PROPERTIES_KEY = "properties";
+  protected static final String VERTEX_KEY_KEY = "key";
+  protected static final String VERTEX_TYPE_KEY = "type";
+  protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+  
+  protected static final String CREATE = "create";
+  protected static final String DELETE = "delete";
+  protected static final String UPDATE = "update";
+  
   protected enum ResponseType {
     SUCCESS, PARTIAL_SUCCESS, FAILURE;
   }
 
-  private final List<String> SUPPORTED_ACTIONS =
-      Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+  protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
 
-  Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
-  private String oxmVersion = null;
+  protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
+  protected String oxmVersion = null;
 
   /** Agent for communicating with the Search Service. */
   protected SearchServiceAgent searchAgent = null;
@@ -268,9 +284,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
 
     // Load the UEB payload data, any errors will result in a failure and discard
 
-    JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
+    JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
     if (spikeObjVertex == null) {
-      returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+      returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
       return null;
     }
 
@@ -279,7 +295,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
   }
 
   protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
-    DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
+    DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
     if (oxmJaxbContext == null) {
       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
@@ -336,7 +352,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
       String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
      
     String entityPrimaryKeyFieldName =
-        getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+        getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
     if (entityPrimaryKeyFieldName == null) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload missing primary key attribute");
@@ -346,7 +362,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
       return null;
     }
     String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
-    if (entityPrimaryKeyFieldValue.isEmpty()) {
+    if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
           "Payload missing primary value attribute");
       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
@@ -391,13 +407,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
 
 
  protected String getOxmEntityType(String entityType) {
-
-   String[] entityTypeArr = entityType.split("-");
-   String oxmEntityType = "";
-   for (String entityWord : entityTypeArr) {
-     oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1);
-   }
-   return oxmEntityType;
+   return new OxmEntityTypeConverter().convert(entityType);
  }
  
  protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
@@ -425,8 +435,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
 
     // Make sure that were were actually passed in a valid string.
     if (payload == null || payload.isEmpty()) {
-      logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
-      logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+      logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
+      logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
 
       return eventVertex;
     }
@@ -448,7 +458,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
   }
 
 
-  private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String payload,
+  private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
       String oxmEntityType, String entityType) {
 
     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
@@ -478,9 +488,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
     try {
       jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
     } catch (IOException e) {
-      logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+      logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
           payload);
-      logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+      logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
           "");
     }
 
@@ -523,7 +533,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
   private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
       String entityType, String fieldName) {
 
-    DynamicType entity = (DynamicType) oxmJaxbContext.getDynamicType(oxmEntityType);
+    DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
     if (entity == null) {
       return null;
     }
@@ -634,8 +644,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
 
       String entityId = eventEntity.getId();
 
-      if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
-          || action.equalsIgnoreCase(ACTION_UPDATE)) {
+      if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+          || action.equalsIgnoreCase(UPDATE)) {
 
         // Run the GET to retrieve the ETAG from the search service
         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -653,11 +663,11 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
         // Write the entity to the search service.
         // PUT
         searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
-      } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+      } else if (action.equalsIgnoreCase(CREATE)) {
         // Write the entry to the search service.
         searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
 
-      } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+      } else if (action.equalsIgnoreCase(DELETE)) {
         // Run the GET to retrieve the ETAG from the search service
         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
 
@@ -670,6 +680,13 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
           }
 
+          /*
+           * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+           * we specify a Content-Type.
+           */
+
+          headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
+
           searchAgent.deleteDocument(index, eventEntity.getId(), headers);
         } else {
           logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
@@ -682,4 +699,228 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
           action);
     }
   }
+  
+  protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
+
+    SpikeEventMeta meta = new SpikeEventMeta();
+    Object eventPayloadObj = null;
+    String eventPayload = null;
+    try {
+       eventPayloadObj = exchange.getIn().getBody();    
+      
+      /*
+       * It is expected that mainJson will have multiple top level objects: - header - body - result
+       */
+      if (eventPayloadObj == null) {
+        returnWithError(exchange, null, "Invalid Payload");
+        return null;
+      }
+      
+      eventPayload = (String)eventPayloadObj;
+       
+      meta.setEventEntity(new JSONObject(eventPayload));
+    } catch (JSONException exc) {
+      returnWithError(exchange, eventPayload, "Invalid Payload");
+      return null;
+    }
+
+    JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
+
+    if (eventHeader == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+      return null;
+    }
+
+    meta.setEventHeader(eventHeader);
+
+    /*
+     * Only process SPIKE update-notification events
+     */
+
+    final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+    final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
+
+    if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+      // drop event
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+          + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+      /*
+       * I don't think ignoring a non-applicable event constitutes a failure.
+       */
+
+      setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+      return null;
+    }
+
+    JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
+
+    if (eventBody == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+      return null;
+    }
+
+    meta.setEventBody(eventBody);
+
+    String action = eventBody.getString(OPERATION_KEY);
+    if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+          "Unrecognized action '" + action + "'", eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Unrecognized action '" + action + "'");
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+    meta.setBodyOperationType(action);
+
+    // Load the event body data, any errors will result in a failure and discard
+
+    JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+    if (spikeVertex == null) {
+      returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
+      return null;
+    }
+
+    meta.setSpikeVertex(spikeVertex);
+
+    SpikeEventVertex spikeEventVertex = null;
+    try {
+      spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
+    } catch (JSONException exc) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+          "Error initializating spike event.  Error: " + exc.getMessage(), eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Error initializating spike event.  Error: " + exc.getMessage());
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+
+    meta.setSpikeEventVertex(spikeEventVertex);
+
+    DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
+    if (oxmJaxbContext == null) {
+      logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
+          eventPayload);
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+
+    meta.setOxmJaxbContext(oxmJaxbContext);
+
+    String entityType = spikeEventVertex.getType();
+    if (entityType == null || entityType.isEmpty()) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+          "Payload header missing entity type", eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Payload header missing entity type");
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+    
+    /*
+     * test if entityType is in the model
+     */
+
+    VersionedOxmEntities oxmEntities =
+    EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
+    
+    if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
+          eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "No matching OXM Descriptor for entity-type='" + entityType + "'");
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+
+    
+    String entityKey = spikeEventVertex.getKey();
+    if (entityKey == null || entityKey.isEmpty()) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
+          eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Payload vertex missing entity key");
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+    String entityLink = spikeEventVertex.getEntityLink();
+    if (entityLink == null || entityLink.isEmpty()) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+          "Payload header missing entity link", eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Payload header missing entity link");
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+
+    JSONObject vertexProperties = null;
+    try {
+
+      vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
+
+    } catch (JSONException exc) {
+      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+          "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
+      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+          "Payload header missing " + VERTEX_PROPERTIES_KEY);
+
+      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+      return null;
+    }
+
+    meta.setVertexProperties(vertexProperties);
+
+    // log the fact that all data are in good shape
+    logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
+    logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
+        eventPayload);
+
+    return meta;
+
+  }
+  
+  protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
+
+    /*
+     * These are all critical keys
+     */
+
+    final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+    final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+    final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
+
+    SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+    eventVertex.setSchemaVersion(vertexSchemaVersion);
+    logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
+
+    return eventVertex;
+
+  }
+  
+  protected List<String> extractSuggestableAttr(VersionedOxmEntities oxmEntities, String entityType) {
+    // Extract suggestable attributeshandleTopographicalData
+    Map<String, OxmEntityDescriptor> rootDescriptor = oxmEntities.getSuggestableEntityDescriptors();
+
+    if (rootDescriptor == null) {
+      return Collections.emptyList();
+    }
+
+    OxmEntityDescriptor desc = rootDescriptor.get(entityType);
+
+    if (desc == null) {
+      return Collections.emptyList();
+    }
+
+    return desc.getSuggestableAttributes();
+  }
+  
 }