X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Faai%2Fdatarouter%2Fpolicy%2FAbstractSpikeEntityEventProcessor.java;h=50d344b25a48566583b6e37c2578ede9a39bd46f;hb=6015c2cc7d1cd28e91b5a9cb5aed5366c6c29290;hp=9627365cd1730f4a6ee76ec7a53399d6fdd71f5c;hpb=bf99eb77b31a4cfbc590762cc6ba669820c21439;p=aai%2Fdata-router.git diff --git a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java index 9627365..50d344b 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java +++ b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java @@ -26,6 +26,7 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -36,21 +37,25 @@ import org.apache.camel.Processor; import org.eclipse.persistence.dynamic.DynamicType; import org.eclipse.persistence.internal.helper.DatabaseField; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.oxm.MediaType; import org.json.JSONException; import org.json.JSONObject; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; +import org.onap.aai.schema.OxmModelLoader; import org.onap.aai.datarouter.entity.DocumentStoreDataEntity; -import org.onap.aai.datarouter.entity.OxmEntityDescriptor; import org.onap.aai.datarouter.entity.SpikeEventEntity; +import org.onap.aai.datarouter.entity.SpikeEventMeta; import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; -import org.onap.aai.datarouter.util.EntityOxmReferenceHelper; -import org.onap.aai.datarouter.util.ExternalOxmModelProcessor; -import org.onap.aai.datarouter.util.OxmModelLoader; import org.onap.aai.datarouter.util.RouterServiceUtil; import org.onap.aai.datarouter.util.SearchServiceAgent; +import org.onap.aai.entity.OxmEntityDescriptor; +import org.onap.aai.util.EntityOxmReferenceHelper; +import org.onap.aai.util.ExternalOxmModelProcessor; +import org.onap.aai.util.Version; +import org.onap.aai.util.VersionedOxmEntities; import org.onap.aai.restclient.client.Headers; import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.restclient.rest.HttpUtil; @@ -59,30 +64,41 @@ import org.slf4j.MDC; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; public abstract class AbstractSpikeEntityEventProcessor implements Processor { protected static final String additionalInfo = "Response of SpikeEntityEventPolicy"; private Collection externalOxmModelProcessors; - - protected final String ACTION_CREATE = "create"; - private final String EVENT_VERTEX = "vertex"; - public final static String ACTION_DELETE = "delete"; - protected final String ACTION_UPDATE = "update"; protected final String PROCESS_SPIKE_EVENT = "Process Spike Event"; - private final String OPERATION_KEY = "operation"; + protected static final String UPDATE_NOTIFICATION = "update-notification"; + protected static final String SPIKE = "SPIKE"; + + protected static final String HEADER_KEY = "header"; + protected static final String EVENT_TYPE_KEY = "event-type"; + protected static final String SOURCE_NAME_KEY = "source-name"; + protected static final String BODY_KEY = "body"; + protected static final String OPERATION_KEY = "operation"; + + protected static final String VERTEX_KEY = "vertex"; + protected static final String VERTEX_PROPERTIES_KEY = "properties"; + protected static final String VERTEX_KEY_KEY = "key"; + protected static final String VERTEX_TYPE_KEY = "type"; + protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version"; + + protected static final String CREATE = "create"; + protected static final String DELETE = "delete"; + protected static final String UPDATE = "update"; + protected enum ResponseType { SUCCESS, PARTIAL_SUCCESS, FAILURE; } - private final List SUPPORTED_ACTIONS = - Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE); + protected final List SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE); - Map oxmVersionContextMap = new HashMap<>(); - private String oxmVersion = null; + protected Map oxmVersionContextMap = new HashMap<>(); + protected String oxmVersion = null; /** Agent for communicating with the Search Service. */ protected SearchServiceAgent searchAgent = null; @@ -268,9 +284,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Load the UEB payload data, any errors will result in a failure and discard - JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX); + JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY); if (spikeObjVertex == null) { - returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX); + returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY); return null; } @@ -279,7 +295,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { } protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) { - DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase()); + DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion); if (oxmJaxbContext == null) { logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload); @@ -336,7 +352,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { String action, String uebPayload, String oxmEntityType, List searchableAttr) { String entityPrimaryKeyFieldName = - getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType); + getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType); if (entityPrimaryKeyFieldName == null) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary key attribute"); @@ -346,7 +362,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { return null; } String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName); - if (entityPrimaryKeyFieldValue.isEmpty()) { + if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary value attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, @@ -391,13 +407,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { protected String getOxmEntityType(String entityType) { - - String[] entityTypeArr = entityType.split("-"); - String oxmEntityType = ""; - for (String entityWord : entityTypeArr) { - oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1); - } - return oxmEntityType; + return new OxmEntityTypeConverter().convert(entityType); } protected List getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType, @@ -425,8 +435,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Make sure that were were actually passed in a valid string. if (payload == null || payload.isEmpty()) { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); + logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY); + logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY); return eventVertex; } @@ -448,7 +458,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { } - private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String payload, + private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String oxmEntityType, String entityType) { DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType); @@ -478,9 +488,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { try { jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload)); } catch (IOException e) { - logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", payload); - logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", ""); } @@ -523,7 +533,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { private List getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType, String entityType, String fieldName) { - DynamicType entity = (DynamicType) oxmJaxbContext.getDynamicType(oxmEntityType); + DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType); if (entity == null) { return null; } @@ -634,8 +644,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { String entityId = eventEntity.getId(); - if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null) - || action.equalsIgnoreCase(ACTION_UPDATE)) { + if ((action.equalsIgnoreCase(CREATE) && entityId != null) + || action.equalsIgnoreCase(UPDATE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -653,11 +663,11 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Write the entity to the search service. // PUT searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_CREATE)) { + } else if (action.equalsIgnoreCase(CREATE)) { // Write the entry to the search service. searchAgent.postDocument(index, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_DELETE)) { + } else if (action.equalsIgnoreCase(DELETE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -670,6 +680,13 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } + /* + * The Spring-Boot version of the search-data-service rejects the DELETE operation unless + * we specify a Content-Type. + */ + + headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType())); + searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); @@ -682,4 +699,228 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { action); } } + + protected SpikeEventMeta processSpikeEvent(Exchange exchange) { + + SpikeEventMeta meta = new SpikeEventMeta(); + Object eventPayloadObj = null; + String eventPayload = null; + try { + eventPayloadObj = exchange.getIn().getBody(); + + /* + * It is expected that mainJson will have multiple top level objects: - header - body - result + */ + if (eventPayloadObj == null) { + returnWithError(exchange, null, "Invalid Payload"); + return null; + } + + eventPayload = (String)eventPayloadObj; + + meta.setEventEntity(new JSONObject(eventPayload)); + } catch (JSONException exc) { + returnWithError(exchange, eventPayload, "Invalid Payload"); + return null; + } + + JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY); + + if (eventHeader == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY); + return null; + } + + meta.setEventHeader(eventHeader); + + /* + * Only process SPIKE update-notification events + */ + + final String sourceName = eventHeader.getString(SOURCE_NAME_KEY); + final String eventType = eventHeader.getString(EVENT_TYPE_KEY); + + if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) { + // drop event + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='" + + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'."); + + /* + * I don't think ignoring a non-applicable event constitutes a failure. + */ + + setResponse(exchange, ResponseType.SUCCESS, additionalInfo); + return null; + } + + JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY); + + if (eventBody == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY); + return null; + } + + meta.setEventBody(eventBody); + + String action = eventBody.getString(OPERATION_KEY); + if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Unrecognized action '" + action + "'", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Unrecognized action '" + action + "'"); + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + meta.setBodyOperationType(action); + + // Load the event body data, any errors will result in a failure and discard + + JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY); + if (spikeVertex == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY); + return null; + } + + meta.setSpikeVertex(spikeVertex); + + SpikeEventVertex spikeEventVertex = null; + try { + spikeEventVertex = initializeSpikeEventVertex(spikeVertex); + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Error initializating spike event. Error: " + exc.getMessage(), eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Error initializating spike event. Error: " + exc.getMessage()); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setSpikeEventVertex(spikeEventVertex); + + DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion); + if (oxmJaxbContext == null) { + logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion); + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", + eventPayload); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setOxmJaxbContext(oxmJaxbContext); + + String entityType = spikeEventVertex.getType(); + if (entityType == null || entityType.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing entity type", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing entity type"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + /* + * test if entityType is in the model + */ + + VersionedOxmEntities oxmEntities = + EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase())); + + if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'", + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "No matching OXM Descriptor for entity-type='" + entityType + "'"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + + String entityKey = spikeEventVertex.getKey(); + if (entityKey == null || entityKey.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key", + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload vertex missing entity key"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + String entityLink = spikeEventVertex.getEntityLink(); + if (entityLink == null || entityLink.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing entity link", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing entity link"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + JSONObject vertexProperties = null; + try { + + vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY); + + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing " + VERTEX_PROPERTIES_KEY); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setVertexProperties(vertexProperties); + + // log the fact that all data are in good shape + logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); + logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, + eventPayload); + + return meta; + + } + + protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException { + + /* + * These are all critical keys + */ + + final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY); + final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY); + final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY); + + SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey); + eventVertex.setSchemaVersion(vertexSchemaVersion); + logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); + + return eventVertex; + + } + + protected List extractSuggestableAttr(VersionedOxmEntities oxmEntities, String entityType) { + // Extract suggestable attributeshandleTopographicalData + Map rootDescriptor = oxmEntities.getSuggestableEntityDescriptors(); + + if (rootDescriptor == null) { + return Collections.emptyList(); + } + + OxmEntityDescriptor desc = rootDescriptor.get(entityType); + + if (desc == null) { + return Collections.emptyList(); + } + + return desc.getSuggestableAttributes(); + } + }