X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fdatarouter%2Fpolicy%2FSpikeEntityEventPolicy.java;h=8f0e426605ebbe0188412275b6531dfce0a3fb5e;hb=f518411876d6830d83d051a915f0b21bdfdcb43e;hp=36bb14298852ed0b55a45a9d3cbc31f1c48a0eb0;hpb=ea4f5f20de7100b1952f2a308af5cf1d7100d019;p=aai%2Fdata-router.git diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java index 36bb142..8f0e426 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java +++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java @@ -30,25 +30,27 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.eclipse.persistence.dynamic.DynamicType; import org.eclipse.persistence.internal.helper.DatabaseField; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.oxm.MediaType; import org.json.JSONException; import org.json.JSONObject; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.datarouter.entity.SpikeEventEntity; import org.onap.aai.datarouter.entity.DocumentStoreDataEntity; -import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.entity.OxmEntityDescriptor; +import org.onap.aai.datarouter.entity.SpikeEventEntity; +import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; import org.onap.aai.datarouter.util.EntityOxmReferenceHelper; import org.onap.aai.datarouter.util.ExternalOxmModelProcessor; -import org.onap.aai.datarouter.util.OxmModelLoader; +import org.onap.aai.datarouter.schema.OxmModelLoader; import org.onap.aai.datarouter.util.RouterServiceUtil; import org.onap.aai.datarouter.util.SearchServiceAgent; import org.onap.aai.restclient.client.Headers; @@ -68,17 +70,38 @@ public class SpikeEntityEventPolicy implements Processor { private Collection externalOxmModelProcessors; - - private static final String ACTION_CREATE = "create"; - private static final String EVENT_VERTEX = "vertex"; - private static final String ACTION_DELETE = "delete"; - private static final String ACTION_UPDATE = "update"; + /** + * Note (8-June-2018): + * + * At present we don't need to support every event-type that could be present in the spike-events. + * The only one we want is a SPIKE "update-notification". In the future perhaps we need to add some + * configurability to the camel-route itself with a json camel filtering component so that routing + * logic can be modified as part of the camel route spring-xml instead of hard-coding the + * event filtering in here. + */ + private static final String PROCESS_SPIKE_EVENT = "Process Spike Event"; + + private static final String UPDATE_NOTIFICATION = "update-notification"; + private static final String SPIKE = "SPIKE"; + + private static final String HEADER_KEY = "header"; + private static final String EVENT_TYPE_KEY = "event-type"; + private static final String SOURCE_NAME_KEY = "source-name"; + private static final String BODY_KEY = "body"; private static final String OPERATION_KEY = "operation"; + private static final String VERTEX_KEY = "vertex"; + private static final String VERTEX_KEY_KEY = "key"; + private static final String VERTEX_TYPE_KEY = "type"; + private static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version"; + + private static final String CREATE = "create"; + private static final String DELETE = "delete"; + private static final String UPDATE = "update"; private static final List SUPPORTED_ACTIONS = - Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE); + Arrays.asList(CREATE, UPDATE, DELETE); Map oxmVersionContextMap = new HashMap<>(); private String oxmVersion = null; @@ -122,7 +145,7 @@ public class SpikeEntityEventPolicy implements Processor { private void parseLatestOxmVersion() { int latestVersion = -1; if (oxmVersionContextMap != null) { - Iterator it = oxmVersionContextMap.entrySet().iterator(); + Iterator> it = oxmVersionContextMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry pair = (Map.Entry) it.next(); @@ -148,6 +171,7 @@ public class SpikeEntityEventPolicy implements Processor { logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED); } + /** * Convert object to json. @@ -177,65 +201,109 @@ public class SpikeEntityEventPolicy implements Processor { setResponse(exchange, ResponseType.FAILURE, additionalInfo); } - public boolean isJSONValid(String test) { + @Override + public void process(Exchange exchange) /*throws Exception*/ { + + long startTime = System.currentTimeMillis(); + + final String eventPayload = exchange.getIn().getBody().toString(); + JSONObject mainJson = null; + try { - new JSONObject(test); - } catch (JSONException ex) { - return false; + + /* + * It is expected that mainJson will have multiple top level objects: + * - header + * - body + * - result + */ + + mainJson = new JSONObject(eventPayload); + } catch (JSONException exc) { + returnWithError(exchange, eventPayload, "Invalid Payload"); + return; } - return true; - } + + JSONObject eventHeader = mainJson.getJSONObject(HEADER_KEY); - @Override - public void process(Exchange exchange) throws Exception { + if (eventHeader == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY); + return; + } + + /* + * Only process SPIKE update-notification events + */ + + final String sourceName = eventHeader.getString(SOURCE_NAME_KEY); + final String eventType = eventHeader.getString(EVENT_TYPE_KEY); - long startTime = System.currentTimeMillis(); - String uebPayload = exchange.getIn().getBody().toString(); - if (uebPayload == null || !isJSONValid(uebPayload)) { - uebPayload = exchange.getIn().getBody(String.class); - if (uebPayload == null || !isJSONValid(uebPayload)) { - returnWithError(exchange, uebPayload, "Invalid Payload"); - return; - } + if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) { + // drop event + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='" + + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'."); + + /* + * I don't think ignoring a non-applicable event constitutes a failure. + */ + + setResponse(exchange, ResponseType.SUCCESS, additionalInfo); + return; } + + JSONObject eventBody = mainJson.getJSONObject(BODY_KEY); + if (eventBody == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY); + return; + } - JSONObject mainJson = new JSONObject(uebPayload); - String action = mainJson.getString(OPERATION_KEY); + String action = eventBody.getString(OPERATION_KEY); if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Unrecognized action '" + action + "'", uebPayload); + "Unrecognized action '" + action + "'", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Unrecognized action '" + action + "'"); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - // Load the UEB payload data, any errors will result in a failure and discard - - JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX); - if (spikeObjVertex == null) { - returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX); + // Load the event body data, any errors will result in a failure and discard + + JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY); + if (spikeVertex == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY); return; } - SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString()); + SpikeEventVertex eventVertex = null; + try { + eventVertex = initializeSpikeEventVertex(spikeVertex); + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Error initializating spike event. Error: " + exc.getMessage(), + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Error initializating spike event. Error: " + exc.getMessage()); - DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase()); + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return; + } + + DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion); if (oxmJaxbContext == null) { logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion); - logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload); + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - - String entityType = eventVertex.getType(); if (entityType == null || entityType.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload header missing entity type", uebPayload); + "Payload header missing entity type", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload header missing entity type"); @@ -246,7 +314,7 @@ public class SpikeEntityEventPolicy implements Processor { String entityKey = eventVertex.getKey(); if (entityKey == null || entityKey.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key", - uebPayload); + eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload vertex missing entity key"); @@ -256,7 +324,7 @@ public class SpikeEntityEventPolicy implements Processor { String entityLink = eventVertex.getEntityLink(); if (entityLink == null || entityLink.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload header missing entity link", uebPayload); + "Payload header missing entity link", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload header missing entity link"); @@ -267,8 +335,7 @@ public class SpikeEntityEventPolicy implements Processor { // log the fact that all data are in good shape logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, - uebPayload); - + eventPayload); // Process for building SpikeEventEntity object String[] entityTypeArr = entityType.split("-"); @@ -276,7 +343,7 @@ public class SpikeEntityEventPolicy implements Processor { for (String entityWord : entityTypeArr) { oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1); } - + List searchableAttr = getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable"); if (searchableAttr == null) { @@ -284,28 +351,28 @@ public class SpikeEntityEventPolicy implements Processor { "Searchable attribute not found for payload entity type '" + entityType + "'"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Searchable attribute not found for payload entity type '" + entityType + "'", - uebPayload); + eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } String entityPrimaryKeyFieldName = - getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType); + getEntityPrimaryKeyFieldName(oxmJaxbContext, eventPayload, oxmEntityType, entityType); if (entityPrimaryKeyFieldName == null) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary key attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing primary key attribute", uebPayload); + "Payload missing primary key attribute", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName); - if (entityPrimaryKeyFieldValue.isEmpty()) { + String entityPrimaryKeyFieldValue = lookupValueUsingKey(eventPayload, entityPrimaryKeyFieldName); + if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary value attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing primary value attribute", uebPayload); + "Payload missing primary value attribute", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -321,12 +388,14 @@ public class SpikeEntityEventPolicy implements Processor { spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldValue); spikeEventEntity.setEntityType(entityType); spikeEventEntity.setLink(entityLink); + + System.out.println(spikeEventEntity); - if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) { + if (!getSearchTags(spikeEventEntity, searchableAttr, eventPayload, action)) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing searchable attribute for entity type '" + entityType + "'"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload); + "Payload missing searchable attribute for entity type '" + entityType + "'", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -338,7 +407,7 @@ public class SpikeEntityEventPolicy implements Processor { } catch (NoSuchAlgorithmException e) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest", - uebPayload); + eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -363,60 +432,19 @@ public class SpikeEntityEventPolicy implements Processor { exchange.getOut().setBody(additionalInfo); } + private SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException { - /* - * Load the UEB JSON payload, any errors would result to a failure case response. - */ - private JSONObject getUebContentAsJson(String payload, String contentKey) { - - JSONObject uebJsonObj; - JSONObject uebObjContent; - - try { - uebJsonObj = new JSONObject(payload); - } catch (JSONException e) { - logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload); - logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload); - return null; - } - - if (uebJsonObj.has(contentKey)) { - uebObjContent = uebJsonObj.getJSONObject(contentKey); - } else { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey); - return null; - } - - return uebObjContent; - } - - - private SpikeEventVertex initializeSpikeEventVertex(String payload) { - - SpikeEventVertex eventVertex = null; - ObjectMapper mapper = new ObjectMapper(); - - // Make sure that were were actually passed in a valid string. - if (payload == null || payload.isEmpty()) { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); + /* + * These are all critical keys + */ - return eventVertex; - } + final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY); + final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY); + final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY); - // Marshal the supplied string into a UebEventHeader object. - try { - eventVertex = mapper.readValue(payload, SpikeEventVertex.class); - } catch (JsonProcessingException e) { - logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString()); - } catch (Exception e) { - logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString()); - } - - if (eventVertex != null) { - logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); - } + SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey); + eventVertex.setSchemaVersion(vertexSchemaVersion); + logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); return eventVertex; @@ -453,9 +481,9 @@ public class SpikeEntityEventPolicy implements Processor { try { jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload)); } catch (IOException e) { - logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", payload); - logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", ""); } @@ -498,7 +526,7 @@ public class SpikeEntityEventPolicy implements Processor { private List getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType, String entityType, String fieldName) { - DynamicType entity = (DynamicType) oxmJaxbContext.getDynamicType(oxmEntityType); + DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType); if (entity == null) { return null; } @@ -584,8 +612,8 @@ public class SpikeEntityEventPolicy implements Processor { String entityId = eventEntity.getId(); - if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null) - || action.equalsIgnoreCase(ACTION_UPDATE)) { + if ((action.equalsIgnoreCase(CREATE) && entityId != null) + || action.equalsIgnoreCase(UPDATE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -603,11 +631,11 @@ public class SpikeEntityEventPolicy implements Processor { // Write the entity to the search service. // PUT searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_CREATE)) { + } else if (action.equalsIgnoreCase(CREATE)) { // Write the entry to the search service. searchAgent.postDocument(index, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_DELETE)) { + } else if (action.equalsIgnoreCase(DELETE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -619,6 +647,13 @@ public class SpikeEntityEventPolicy implements Processor { } else { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } + + /* + * The Spring-Boot version of the search-data-service rejects the DELETE operation unless + * we specify a Content-Type. + */ + + headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType())); searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else {