import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.eclipse.persistence.dynamic.DynamicType;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
import org.json.JSONException;
import org.json.JSONObject;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.datarouter.entity.SpikeEventEntity;
import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
+import org.onap.aai.datarouter.entity.SpikeEventEntity;
import org.onap.aai.datarouter.entity.SpikeEventVertex;
-import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
-import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
-import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
-import org.onap.aai.datarouter.util.OxmModelLoader;
import org.onap.aai.datarouter.util.RouterServiceUtil;
import org.onap.aai.datarouter.util.SearchServiceAgent;
+import org.onap.aai.entity.OxmEntityDescriptor;
+import org.onap.aai.util.EntityOxmReferenceHelper;
+import org.onap.aai.util.ExternalOxmModelProcessor;
+import org.onap.aai.schema.OxmModelLoader;
import org.onap.aai.restclient.client.Headers;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.restclient.rest.HttpUtil;
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
-
- private static final String ACTION_CREATE = "create";
- private static final String EVENT_VERTEX = "vertex";
- private static final String ACTION_DELETE = "delete";
- private static final String ACTION_UPDATE = "update";
+ /**
+ * Note (8-June-2018):
+ *
+ * At present we don't need to support every event-type that could be present in the spike-events.
+ * The only one we want is a SPIKE "update-notification". In the future perhaps we need to add some
+ * configurability to the camel-route itself with a json camel filtering component so that routing
+ * logic can be modified as part of the camel route spring-xml instead of hard-coding the
+ * event filtering in here.
+ */
+
private static final String PROCESS_SPIKE_EVENT = "Process Spike Event";
+
+ private static final String UPDATE_NOTIFICATION = "update-notification";
+ private static final String SPIKE = "SPIKE";
+
+ private static final String HEADER_KEY = "header";
+ private static final String EVENT_TYPE_KEY = "event-type";
+ private static final String SOURCE_NAME_KEY = "source-name";
+ private static final String BODY_KEY = "body";
private static final String OPERATION_KEY = "operation";
+ private static final String VERTEX_KEY = "vertex";
+ private static final String VERTEX_KEY_KEY = "key";
+ private static final String VERTEX_TYPE_KEY = "type";
+ private static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+
+ private static final String CREATE = "create";
+ private static final String DELETE = "delete";
+ private static final String UPDATE = "update";
private static final List<String> SUPPORTED_ACTIONS =
- Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+ Arrays.asList(CREATE, UPDATE, DELETE);
Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
private String oxmVersion = null;
private void parseLatestOxmVersion() {
int latestVersion = -1;
if (oxmVersionContextMap != null) {
- Iterator it = oxmVersionContextMap.entrySet().iterator();
+ Iterator<Entry<String, DynamicJAXBContext>> it = oxmVersionContextMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry pair = (Map.Entry) it.next();
logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
}
+
/**
* Convert object to json.
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
}
- public boolean isJSONValid(String test) {
+ @Override
+ public void process(Exchange exchange) /*throws Exception*/ {
+
+ long startTime = System.currentTimeMillis();
+
+ final String eventPayload = exchange.getIn().getBody().toString();
+ JSONObject mainJson = null;
+
try {
- new JSONObject(test);
- } catch (JSONException ex) {
- return false;
+
+ /*
+ * It is expected that mainJson will have multiple top level objects:
+ * - header
+ * - body
+ * - result
+ */
+
+ mainJson = new JSONObject(eventPayload);
+ } catch (JSONException exc) {
+ returnWithError(exchange, eventPayload, "Invalid Payload");
+ return;
}
- return true;
- }
+
+ JSONObject eventHeader = mainJson.getJSONObject(HEADER_KEY);
- @Override
- public void process(Exchange exchange) throws Exception {
+ if (eventHeader == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+ return;
+ }
+
+ /*
+ * Only process SPIKE update-notification events
+ */
+
+ final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+ final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
- long startTime = System.currentTimeMillis();
- String uebPayload = exchange.getIn().getBody().toString();
- if (uebPayload == null || !isJSONValid(uebPayload)) {
- uebPayload = exchange.getIn().getBody(String.class);
- if (uebPayload == null || !isJSONValid(uebPayload)) {
- returnWithError(exchange, uebPayload, "Invalid Payload");
- return;
- }
+ if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+ // drop event
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+ + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+ /*
+ * I don't think ignoring a non-applicable event constitutes a failure.
+ */
+
+ setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+ return;
}
+
+ JSONObject eventBody = mainJson.getJSONObject(BODY_KEY);
+ if (eventBody == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+ return;
+ }
- JSONObject mainJson = new JSONObject(uebPayload);
- String action = mainJson.getString(OPERATION_KEY);
+ String action = eventBody.getString(OPERATION_KEY);
if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Unrecognized action '" + action + "'", uebPayload);
+ "Unrecognized action '" + action + "'", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Unrecognized action '" + action + "'");
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
- // Load the UEB payload data, any errors will result in a failure and discard
-
- JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
- if (spikeObjVertex == null) {
- returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+ // Load the event body data, any errors will result in a failure and discard
+
+ JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+ if (spikeVertex == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
return;
}
- SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
+ SpikeEventVertex eventVertex = null;
+ try {
+ eventVertex = initializeSpikeEventVertex(spikeVertex);
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Error initializating spike event. Error: " + exc.getMessage(),
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage());
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return;
+ }
- DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
+ DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
if (oxmJaxbContext == null) {
logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
- logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
-
-
String entityType = eventVertex.getType();
if (entityType == null || entityType.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload header missing entity type", uebPayload);
+ "Payload header missing entity type", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload header missing entity type");
String entityKey = eventVertex.getKey();
if (entityKey == null || entityKey.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
- uebPayload);
+ eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload vertex missing entity key");
String entityLink = eventVertex.getEntityLink();
if (entityLink == null || entityLink.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload header missing entity link", uebPayload);
+ "Payload header missing entity link", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload header missing entity link");
// log the fact that all data are in good shape
logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
- uebPayload);
-
+ eventPayload);
// Process for building SpikeEventEntity object
- String[] entityTypeArr = entityType.split("-");
- String oxmEntityType = "";
- for (String entityWord : entityTypeArr) {
- oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1);
- }
-
+ String oxmEntityType = new OxmEntityTypeConverter().convert(entityType);
+
List<String> searchableAttr =
getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
if (searchableAttr == null) {
"Searchable attribute not found for payload entity type '" + entityType + "'");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
"Searchable attribute not found for payload entity type '" + entityType + "'",
- uebPayload);
+ eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
String entityPrimaryKeyFieldName =
- getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+ getEntityPrimaryKeyFieldName(oxmJaxbContext, eventPayload, oxmEntityType, entityType);
if (entityPrimaryKeyFieldName == null) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary key attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing primary key attribute", uebPayload);
+ "Payload missing primary key attribute", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
- String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
- if (entityPrimaryKeyFieldValue.isEmpty()) {
+ String entityPrimaryKeyFieldValue = lookupValueUsingKey(eventPayload, entityPrimaryKeyFieldName);
+ if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary value attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing primary value attribute", uebPayload);
+ "Payload missing primary value attribute", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldValue);
spikeEventEntity.setEntityType(entityType);
spikeEventEntity.setLink(entityLink);
+
+ System.out.println(spikeEventEntity);
- if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
+ if (!getSearchTags(spikeEventEntity, searchableAttr, eventPayload, action)) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing searchable attribute for entity type '" + entityType + "'");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
+ "Payload missing searchable attribute for entity type '" + entityType + "'", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
} catch (NoSuchAlgorithmException e) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
- uebPayload);
+ eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
exchange.getOut().setBody(additionalInfo);
}
+ private SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
- /*
- * Load the UEB JSON payload, any errors would result to a failure case response.
- */
- private JSONObject getUebContentAsJson(String payload, String contentKey) {
-
- JSONObject uebJsonObj;
- JSONObject uebObjContent;
-
- try {
- uebJsonObj = new JSONObject(payload);
- } catch (JSONException e) {
- logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
- logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
- return null;
- }
-
- if (uebJsonObj.has(contentKey)) {
- uebObjContent = uebJsonObj.getJSONObject(contentKey);
- } else {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
- return null;
- }
-
- return uebObjContent;
- }
-
-
- private SpikeEventVertex initializeSpikeEventVertex(String payload) {
-
- SpikeEventVertex eventVertex = null;
- ObjectMapper mapper = new ObjectMapper();
-
- // Make sure that were were actually passed in a valid string.
- if (payload == null || payload.isEmpty()) {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+ /*
+ * These are all critical keys
+ */
- return eventVertex;
- }
+ final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+ final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+ final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
- // Marshal the supplied string into a UebEventHeader object.
- try {
- eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
- } catch (JsonProcessingException e) {
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
- } catch (Exception e) {
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
- }
-
- if (eventVertex != null) {
- logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
- }
+ SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+ eventVertex.setSchemaVersion(vertexSchemaVersion);
+ logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
return eventVertex;
try {
jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
} catch (IOException e) {
- logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
payload);
- logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
"");
}
private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
String entityType, String fieldName) {
- DynamicType entity = (DynamicType) oxmJaxbContext.getDynamicType(oxmEntityType);
+ DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
if (entity == null) {
return null;
}
String entityId = eventEntity.getId();
- if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
- || action.equalsIgnoreCase(ACTION_UPDATE)) {
+ if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+ || action.equalsIgnoreCase(UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
// Write the entity to the search service.
// PUT
searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+ } else if (action.equalsIgnoreCase(CREATE)) {
// Write the entry to the search service.
searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+ } else if (action.equalsIgnoreCase(DELETE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
}
+
+ /*
+ * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+ * we specify a Content-Type.
+ */
+
+ headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {