2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.datarouter.policy;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
34 import org.apache.camel.Exchange;
35 import org.apache.camel.Processor;
36 import org.apache.http.conn.routing.RouteInfo.LayerType;
37 import org.eclipse.persistence.dynamic.DynamicType;
38 import org.eclipse.persistence.internal.helper.DatabaseField;
39 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
40 import org.eclipse.persistence.oxm.MediaType;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45 import org.onap.aai.cl.mdc.MdcContext;
46 import org.onap.aai.datarouter.schema.OxmModelLoader;
47 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
48 import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
49 import org.onap.aai.datarouter.entity.SpikeEventEntity;
50 import org.onap.aai.datarouter.entity.SpikeEventMeta;
51 import org.onap.aai.datarouter.entity.SpikeEventVertex;
52 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
53 import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
54 import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
55 import org.onap.aai.datarouter.util.RouterServiceUtil;
56 import org.onap.aai.datarouter.util.SearchServiceAgent;
57 import org.onap.aai.datarouter.util.Version;
58 import org.onap.aai.datarouter.util.VersionedOxmEntities;
59 import org.onap.aai.restclient.client.Headers;
60 import org.onap.aai.restclient.client.OperationResult;
61 import org.onap.aai.restclient.rest.HttpUtil;
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.JsonNode;
66 import com.fasterxml.jackson.databind.ObjectMapper;
68 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
70 protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
71 private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
73 protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
75 protected static final String UPDATE_NOTIFICATION = "update-notification";
76 protected static final String SPIKE = "SPIKE";
78 protected static final String HEADER_KEY = "header";
79 protected static final String EVENT_TYPE_KEY = "event-type";
80 protected static final String SOURCE_NAME_KEY = "source-name";
81 protected static final String BODY_KEY = "body";
82 protected static final String OPERATION_KEY = "operation";
84 protected static final String VERTEX_KEY = "vertex";
85 protected static final String VERTEX_PROPERTIES_KEY = "properties";
86 protected static final String VERTEX_KEY_KEY = "key";
87 protected static final String VERTEX_TYPE_KEY = "type";
88 protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
90 protected static final String CREATE = "create";
91 protected static final String DELETE = "delete";
92 protected static final String UPDATE = "update";
94 protected enum ResponseType {
95 SUCCESS, PARTIAL_SUCCESS, FAILURE;
98 protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
100 protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
101 protected String oxmVersion = null;
103 /** Agent for communicating with the Search Service. */
104 protected SearchServiceAgent searchAgent = null;
105 protected String searchIndexName;
106 protected String searchIndexSchema;
107 protected String createIndexUrl;
109 protected Logger logger;
110 protected Logger metricsLogger;
111 protected ObjectMapper mapper;
114 public AbstractSpikeEntityEventProcessor(SpikeEventPolicyConfig config)
115 throws FileNotFoundException {
116 mapper = new ObjectMapper();
117 LoggerFactory loggerFactoryInstance = LoggerFactory.getInstance();
118 logger = loggerFactoryInstance.getLogger(AbstractSpikeEntityEventProcessor.class.getName());
120 loggerFactoryInstance.getMetricsLogger(AbstractSpikeEntityEventProcessor.class.getName());
122 // Instantiate the agent that we will use for interacting with the Search Service.
123 searchAgent = new SearchServiceAgent(config.getSearchCertName(), config.getSearchKeystore(),
124 config.getSearchKeystorePwd(), AbstractSpikeEntityEventProcessor
125 .concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint()),
126 config.getSearchEndpointDocuments(), logger);
128 this.externalOxmModelProcessors = new ArrayList<>();
129 this.externalOxmModelProcessors.add(EntityOxmReferenceHelper.getInstance());
130 OxmModelLoader.registerExternalOxmModelProcessors(externalOxmModelProcessors);
131 OxmModelLoader.loadModels();
132 oxmVersionContextMap = OxmModelLoader.getVersionContextMap();
133 parseLatestOxmVersion();
136 public String getCreateIndexUrl() {
137 return createIndexUrl;
141 public void setCreateIndexUrl(String createIndexUrl) {
142 this.createIndexUrl = createIndexUrl;
145 public String getSearchIndexName() {
146 return searchIndexName;
150 public void setSearchIndexName(String searchIndexName) {
151 this.searchIndexName = searchIndexName;
154 public String getSearchIndexSchema() {
155 return searchIndexSchema;
159 public void setSearchIndexSchema(String searchIndexSchema) {
160 this.searchIndexSchema = searchIndexSchema;
163 protected void startup() {
168 * Load the UEB JSON payload, any errors would result to a failure case response.
170 protected JSONObject getUebContentAsJson(String payload, String contentKey) {
172 JSONObject uebJsonObj;
173 JSONObject uebObjContent;
176 uebJsonObj = new JSONObject(payload);
177 } catch (JSONException e) {
178 logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
179 logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
183 if (uebJsonObj.has(contentKey)) {
184 uebObjContent = uebJsonObj.getJSONObject(contentKey);
186 logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
187 logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
191 return uebObjContent;
193 public abstract void process(Exchange exchange) throws Exception;
196 private void parseLatestOxmVersion() {
197 int latestVersion = -1;
198 if (oxmVersionContextMap != null) {
199 Iterator it = oxmVersionContextMap.entrySet().iterator();
200 while (it.hasNext()) {
201 Map.Entry pair = (Map.Entry) it.next();
203 String version = pair.getKey().toString();
204 int versionNum = Integer.parseInt(version.substring(1, version.length()));
206 if (versionNum > latestVersion) {
207 latestVersion = versionNum;
208 oxmVersion = pair.getKey().toString();
211 logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
214 logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
221 * This will be used in: updateSearchEntityWithCrossEntityReference not this scope Convert object
224 * @param object the object
225 * @param pretty the pretty
227 * @throws JsonProcessingException the json processing exception
229 * protected static String convertObjectToJson(Object object, boolean pretty) throws
230 * JsonProcessingException { ObjectWriter ow;
232 * if (pretty) { ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
234 * } else { ow = new ObjectMapper().writer(); }
236 * return ow.writeValueAsString(object); }
239 protected void returnWithError(Exchange exchange, String payload, String errorMsg) {
240 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, errorMsg);
241 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, errorMsg, payload);
242 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
245 private boolean isJSONValid(String test) {
247 new JSONObject(test);
248 } catch (JSONException ex) {
256 protected String getSpikeEventAction(Exchange exchange, String uebPayload) {
257 JSONObject mainJson = new JSONObject(uebPayload);
258 String action = mainJson.getString(OPERATION_KEY);
259 if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
260 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
261 "Unrecognized action '" + action + "'", uebPayload);
262 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
263 "Unrecognized action '" + action + "'");
264 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
270 protected String getExchangeBody(Exchange exchange) {
271 String uebPayload = exchange.getIn().getBody().toString();
272 if (uebPayload == null || !isJSONValid(uebPayload)) {
273 uebPayload = exchange.getIn().getBody(String.class);
274 if (uebPayload == null || !isJSONValid(uebPayload)) {
275 returnWithError(exchange, uebPayload, "Invalid Payload");
282 protected SpikeEventVertex populateEventVertex(Exchange exchange, String uebPayload)
285 // Load the UEB payload data, any errors will result in a failure and discard
287 JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
288 if (spikeObjVertex == null) {
289 returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
293 SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
297 protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
298 DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
299 if (oxmJaxbContext == null) {
300 logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
301 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
303 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
306 return oxmJaxbContext;
310 protected String getEntityType(Exchange exchange, SpikeEventVertex eventVertex,
313 String entityType = eventVertex.getType();
314 if (entityType == null || entityType.isEmpty()) {
315 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
316 "Payload header missing entity type", uebPayload);
317 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
318 "Payload header missing entity type");
320 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
328 protected String getEntityLink(Exchange exchange, SpikeEventVertex eventVertex,
330 String entityKey = eventVertex.getKey();
331 if (entityKey == null || entityKey.isEmpty()) {
332 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
334 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
335 "Payload vertex missing entity key");
337 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
340 //EntityLink never can be null if entityKey is not null. no need to check
341 return eventVertex.getEntityLink();
348 * Use the OXM Model to determine the primary key field name based on the entity-type
350 protected SpikeEventEntity populateSpikeEventEntity(Exchange exchange,
351 SpikeEventEntity spikeEventEntity, DynamicJAXBContext oxmJaxbContext, String entityType,
352 String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
354 String entityPrimaryKeyFieldName =
355 getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
356 if (entityPrimaryKeyFieldName == null) {
357 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
358 "Payload missing primary key attribute");
359 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
360 "Payload missing primary key attribute", uebPayload);
361 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
364 String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
365 if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
366 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
367 "Payload missing primary value attribute");
368 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
369 "Payload missing primary value attribute", uebPayload);
371 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
376 if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
377 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
378 "Payload missing searchable attribute for entity type '" + entityType + "'");
379 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
380 "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
382 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
385 spikeEventEntity.setEntityPrimaryKeyName(entityPrimaryKeyFieldName);
386 spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldName);
389 spikeEventEntity.deriveFields();
391 } catch (NoSuchAlgorithmException e) {
392 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
393 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
396 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
399 return spikeEventEntity;
402 protected void setResponse(Exchange exchange, ResponseType responseType, String additionalInfo) {
404 exchange.getOut().setHeader("ResponseType", responseType.toString());
405 exchange.getOut().setBody(additionalInfo);
409 protected String getOxmEntityType(String entityType) {
410 return new OxmEntityTypeConverter().convert(entityType);
413 protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
414 String entityType, String uebPayload,Exchange exchange) {
415 List<String> searchableAttr =
416 getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
417 if (searchableAttr == null) {
418 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
419 "Searchable attribute not found for payload entity type '" + entityType + "'");
420 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
421 "Searchable attribute not found for payload entity type '" + entityType + "'",
424 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
427 return searchableAttr;
431 private SpikeEventVertex initializeSpikeEventVertex(String payload) {
433 SpikeEventVertex eventVertex = null;
434 ObjectMapper mapper = new ObjectMapper();
436 // Make sure that were were actually passed in a valid string.
437 if (payload == null || payload.isEmpty()) {
438 logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
439 logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
444 // Marshal the supplied string into a UebEventHeader object.
446 eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
447 } catch (JsonProcessingException e) {
448 logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
449 } catch (Exception e) {
450 logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
453 if (eventVertex != null) {
454 logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
461 private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
462 String oxmEntityType, String entityType) {
464 DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
465 if (entity == null) {
469 List<DatabaseField> list = entity.getDescriptor().getPrimaryKeyFields();
470 if (list != null && !list.isEmpty()) {
471 String keyName = list.get(0).getName();
472 return keyName.substring(0, keyName.indexOf('/'));
478 private String lookupValueUsingKey(String payload, String key) throws JSONException {
479 JsonNode jsonNode = convertToJsonNode(payload);
480 return RouterServiceUtil.recursivelyLookupJsonPayload(jsonNode, key);
484 private JsonNode convertToJsonNode(String payload) {
486 ObjectMapper mapper = new ObjectMapper();
487 JsonNode jsonNode = null;
489 jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
490 } catch (IOException e) {
491 logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
493 logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
501 private boolean getSearchTags(SpikeEventEntity spikeEventEntity, List<String> searchableAttr,
502 String payload, String action) {
504 boolean hasSearchableAttr = false;
505 for (String searchTagField : searchableAttr) {
506 String searchTagValue;
507 if (searchTagField.equalsIgnoreCase(spikeEventEntity.getEntityPrimaryKeyName())) {
508 searchTagValue = spikeEventEntity.getEntityPrimaryKeyValue();
510 searchTagValue = this.lookupValueUsingKey(payload, searchTagField);
513 if (searchTagValue != null && !searchTagValue.isEmpty()) {
514 hasSearchableAttr = true;
515 spikeEventEntity.addSearchTagWithKey(searchTagValue, searchTagField);
518 return hasSearchableAttr;
522 * Check if OXM version is available. If available, load it.
524 private DynamicJAXBContext loadOxmContext(String version) {
525 if (version == null) {
526 logger.error(EntityEventPolicyMsgs.FAILED_TO_FIND_OXM_VERSION, version);
530 return oxmVersionContextMap.get(version);
533 private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
534 String entityType, String fieldName) {
536 DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
537 if (entity == null) {
542 * Check for searchable XML tag
544 List<String> fieldValues = null;
545 Map<String, String> properties = entity.getDescriptor().getProperties();
546 for (Map.Entry<String, String> entry : properties.entrySet()) {
547 if (entry.getKey().equalsIgnoreCase(fieldName)) {
548 fieldValues = Arrays.asList(entry.getValue().split(","));
556 protected SpikeEventEntity getPopulatedEntity(JsonNode entityNode,
557 OxmEntityDescriptor resultDescriptor) {
558 SpikeEventEntity d = new SpikeEventEntity();
560 d.setEntityType(resultDescriptor.getEntityName());
562 List<String> primaryKeyValues = new ArrayList<>();
563 List<String> primaryKeyNames = new ArrayList<>();
566 for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
567 pkeyValue = RouterServiceUtil.getNodeFieldAsText(entityNode, keyName);
568 if (pkeyValue != null) {
569 primaryKeyValues.add(pkeyValue);
570 primaryKeyNames.add(keyName);
572 // logger.warn("getPopulatedDocument(), pKeyValue is null for entityType = " +
573 // resultDescriptor.getEntityName());
574 logger.error(EntityEventPolicyMsgs.PRIMARY_KEY_NULL_FOR_ENTITY_TYPE,
575 resultDescriptor.getEntityName());
579 final String primaryCompositeKeyValue = RouterServiceUtil.concatArray(primaryKeyValues, "/");
580 d.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
581 final String primaryCompositeKeyName = RouterServiceUtil.concatArray(primaryKeyNames, "/");
582 d.setEntityPrimaryKeyName(primaryCompositeKeyName);
584 final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
587 * Based on configuration, use the configured field names for this entity-Type to build a
588 * multi-value collection of search tags for elastic search entity search criteria.
592 for (String searchTagField : searchTagFields) {
593 String searchTagValue = RouterServiceUtil.getNodeFieldAsText(entityNode, searchTagField);
594 if (searchTagValue != null && !searchTagValue.isEmpty()) {
595 d.addSearchTagWithKey(searchTagValue, searchTagField);
604 // put this here until we find a better spot
606 * Helper utility to concatenate substrings of a URI together to form a proper URI.
608 * @param suburis the list of substrings to concatenate together
609 * @return the concatenated list of substrings
611 private static String concatSubUri(String... suburis) {
612 String finalUri = "";
614 for (String suburi : suburis) {
616 if (suburi != null) {
617 // Remove any leading / since we only want to append /
618 suburi = suburi.replaceFirst("^/*", "");
620 // Add a trailing / if one isn't already there
621 finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
631 * Perform create, read, update or delete (CRUD) operation on search engine's suggestive search
634 * @param eventEntity Entity/data to use in operation
635 * @param action The operation to perform
637 protected void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
641 Map<String, List<String>> headers = new HashMap<>();
642 headers.put(Headers.FROM_APP_ID, Arrays.asList("DataLayer"));
643 headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
645 String entityId = eventEntity.getId();
647 if ((action.equalsIgnoreCase(CREATE) && entityId != null)
648 || action.equalsIgnoreCase(UPDATE)) {
650 // Run the GET to retrieve the ETAG from the search service
651 OperationResult storedEntity = searchAgent.getDocument(index, entityId);
653 if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
654 List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
656 if (etag != null && !etag.isEmpty()) {
657 headers.put(Headers.IF_MATCH, etag);
659 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
663 // Write the entity to the search service.
665 searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
666 } else if (action.equalsIgnoreCase(CREATE)) {
667 // Write the entry to the search service.
668 searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
670 } else if (action.equalsIgnoreCase(DELETE)) {
671 // Run the GET to retrieve the ETAG from the search service
672 OperationResult storedEntity = searchAgent.getDocument(index, entityId);
674 if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
675 List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
677 if (etag != null && !etag.isEmpty()) {
678 headers.put(Headers.IF_MATCH, etag);
680 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
684 * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
685 * we specify a Content-Type.
688 headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
690 searchAgent.deleteDocument(index, eventEntity.getId(), headers);
692 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
695 logger.error(EntityEventPolicyMsgs.ENTITY_OPERATION_NOT_SUPPORTED, action);
697 } catch (IOException e) {
698 logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, eventEntity.getId(),
703 protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
705 SpikeEventMeta meta = new SpikeEventMeta();
706 Object eventPayloadObj = null;
707 String eventPayload = null;
709 eventPayloadObj = exchange.getIn().getBody();
712 * It is expected that mainJson will have multiple top level objects: - header - body - result
714 if (eventPayloadObj == null) {
715 returnWithError(exchange, null, "Invalid Payload");
719 eventPayload = (String)eventPayloadObj;
721 meta.setEventEntity(new JSONObject(eventPayload));
722 } catch (JSONException exc) {
723 returnWithError(exchange, eventPayload, "Invalid Payload");
727 JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
729 if (eventHeader == null) {
730 returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
734 meta.setEventHeader(eventHeader);
737 * Only process SPIKE update-notification events
740 final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
741 final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
743 if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
745 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
746 + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
747 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
748 "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
751 * I don't think ignoring a non-applicable event constitutes a failure.
754 setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
758 JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
760 if (eventBody == null) {
761 returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
765 meta.setEventBody(eventBody);
767 String action = eventBody.getString(OPERATION_KEY);
768 if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
769 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
770 "Unrecognized action '" + action + "'", eventPayload);
771 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
772 "Unrecognized action '" + action + "'");
773 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
776 meta.setBodyOperationType(action);
778 // Load the event body data, any errors will result in a failure and discard
780 JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
781 if (spikeVertex == null) {
782 returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
786 meta.setSpikeVertex(spikeVertex);
788 SpikeEventVertex spikeEventVertex = null;
790 spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
791 } catch (JSONException exc) {
792 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
793 "Error initializating spike event. Error: " + exc.getMessage(), eventPayload);
794 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
795 "Error initializating spike event. Error: " + exc.getMessage());
797 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
801 meta.setSpikeEventVertex(spikeEventVertex);
803 DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
804 if (oxmJaxbContext == null) {
805 logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
806 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
809 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
813 meta.setOxmJaxbContext(oxmJaxbContext);
815 String entityType = spikeEventVertex.getType();
816 if (entityType == null || entityType.isEmpty()) {
817 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
818 "Payload header missing entity type", eventPayload);
819 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
820 "Payload header missing entity type");
822 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
827 * test if entityType is in the model
830 VersionedOxmEntities oxmEntities =
831 EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
833 if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
834 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
836 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
837 "No matching OXM Descriptor for entity-type='" + entityType + "'");
839 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
844 String entityKey = spikeEventVertex.getKey();
845 if (entityKey == null || entityKey.isEmpty()) {
846 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
848 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
849 "Payload vertex missing entity key");
851 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
854 String entityLink = spikeEventVertex.getEntityLink();
855 if (entityLink == null || entityLink.isEmpty()) {
856 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
857 "Payload header missing entity link", eventPayload);
858 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
859 "Payload header missing entity link");
861 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
865 JSONObject vertexProperties = null;
868 vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
870 } catch (JSONException exc) {
871 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
872 "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
873 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
874 "Payload header missing " + VERTEX_PROPERTIES_KEY);
876 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
880 meta.setVertexProperties(vertexProperties);
882 // log the fact that all data are in good shape
883 logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
884 logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
891 protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
894 * These are all critical keys
897 final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
898 final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
899 final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
901 SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
902 eventVertex.setSchemaVersion(vertexSchemaVersion);
903 logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());