+
+ protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
+
+ SpikeEventMeta meta = new SpikeEventMeta();
+ Object eventPayloadObj = null;
+ String eventPayload = null;
+ try {
+ eventPayloadObj = exchange.getIn().getBody();
+
+ /*
+ * It is expected that mainJson will have multiple top level objects: - header - body - result
+ */
+ if (eventPayloadObj == null) {
+ returnWithError(exchange, null, "Invalid Payload");
+ return null;
+ }
+
+ eventPayload = (String)eventPayloadObj;
+
+ meta.setEventEntity(new JSONObject(eventPayload));
+ } catch (JSONException exc) {
+ returnWithError(exchange, eventPayload, "Invalid Payload");
+ return null;
+ }
+
+ JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
+
+ if (eventHeader == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+ return null;
+ }
+
+ meta.setEventHeader(eventHeader);
+
+ /*
+ * Only process SPIKE update-notification events
+ */
+
+ final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+ final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
+
+ if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+ // drop event
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+ + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+ /*
+ * I don't think ignoring a non-applicable event constitutes a failure.
+ */
+
+ setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+ return null;
+ }
+
+ JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
+
+ if (eventBody == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+ return null;
+ }
+
+ meta.setEventBody(eventBody);
+
+ String action = eventBody.getString(OPERATION_KEY);
+ if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Unrecognized action '" + action + "'", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Unrecognized action '" + action + "'");
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ meta.setBodyOperationType(action);
+
+ // Load the event body data, any errors will result in a failure and discard
+
+ JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+ if (spikeVertex == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
+ return null;
+ }
+
+ meta.setSpikeVertex(spikeVertex);
+
+ SpikeEventVertex spikeEventVertex = null;
+ try {
+ spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage(), eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage());
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setSpikeEventVertex(spikeEventVertex);
+
+ DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
+ if (oxmJaxbContext == null) {
+ logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
+ eventPayload);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setOxmJaxbContext(oxmJaxbContext);
+
+ String entityType = spikeEventVertex.getType();
+ if (entityType == null || entityType.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity type", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity type");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ /*
+ * test if entityType is in the model
+ */
+
+ VersionedOxmEntities oxmEntities =
+ EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
+
+ if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "No matching OXM Descriptor for entity-type='" + entityType + "'");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+
+ String entityKey = spikeEventVertex.getKey();
+ if (entityKey == null || entityKey.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload vertex missing entity key");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ String entityLink = spikeEventVertex.getEntityLink();
+ if (entityLink == null || entityLink.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity link", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity link");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ JSONObject vertexProperties = null;
+ try {
+
+ vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
+
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setVertexProperties(vertexProperties);
+
+ // log the fact that all data are in good shape
+ logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
+ logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
+ eventPayload);
+
+ return meta;
+
+ }
+
+ protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
+
+ /*
+ * These are all critical keys
+ */
+
+ final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+ final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+ final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
+
+ SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+ eventVertex.setSchemaVersion(vertexSchemaVersion);
+ logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
+
+ return eventVertex;
+
+ }
+