2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.datarouter.policy;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
34 import org.apache.camel.Exchange;
35 import org.apache.camel.Processor;
36 import org.eclipse.persistence.dynamic.DynamicType;
37 import org.eclipse.persistence.internal.helper.DatabaseField;
38 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
39 import org.eclipse.persistence.oxm.MediaType;
40 import org.json.JSONException;
41 import org.json.JSONObject;
42 import org.onap.aai.cl.api.Logger;
43 import org.onap.aai.cl.eelf.LoggerFactory;
44 import org.onap.aai.cl.mdc.MdcContext;
45 import org.onap.aai.schema.OxmModelLoader;
46 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
47 import org.onap.aai.datarouter.entity.SpikeEventEntity;
48 import org.onap.aai.datarouter.entity.SpikeEventMeta;
49 import org.onap.aai.datarouter.entity.SpikeEventVertex;
50 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
51 import org.onap.aai.datarouter.util.RouterServiceUtil;
52 import org.onap.aai.datarouter.util.SearchServiceAgent;
53 import org.onap.aai.entity.OxmEntityDescriptor;
54 import org.onap.aai.util.EntityOxmReferenceHelper;
55 import org.onap.aai.util.ExternalOxmModelProcessor;
56 import org.onap.aai.util.Version;
57 import org.onap.aai.util.VersionedOxmEntities;
58 import org.onap.aai.restclient.client.Headers;
59 import org.onap.aai.restclient.client.OperationResult;
60 import org.onap.aai.restclient.rest.HttpUtil;
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.JsonNode;
65 import com.fasterxml.jackson.databind.ObjectMapper;
67 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
69 protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
70 private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
72 protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
74 protected static final String UPDATE_NOTIFICATION = "update-notification";
75 protected static final String SPIKE = "SPIKE";
77 protected static final String HEADER_KEY = "header";
78 protected static final String EVENT_TYPE_KEY = "event-type";
79 protected static final String SOURCE_NAME_KEY = "source-name";
80 protected static final String BODY_KEY = "body";
81 protected static final String OPERATION_KEY = "operation";
83 protected static final String VERTEX_KEY = "vertex";
84 protected static final String VERTEX_PROPERTIES_KEY = "properties";
85 protected static final String VERTEX_KEY_KEY = "key";
86 protected static final String VERTEX_TYPE_KEY = "type";
87 protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
89 protected static final String CREATE = "create";
90 protected static final String DELETE = "delete";
91 protected static final String UPDATE = "update";
93 protected enum ResponseType {
94 SUCCESS, PARTIAL_SUCCESS, FAILURE;
97 protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
99 protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
100 protected String oxmVersion = null;
102 /** Agent for communicating with the Search Service. */
103 protected SearchServiceAgent searchAgent = null;
104 protected String searchIndexName;
105 protected String searchIndexSchema;
106 protected String createIndexUrl;
108 protected Logger logger;
109 protected Logger metricsLogger;
110 protected ObjectMapper mapper;
113 public AbstractSpikeEntityEventProcessor(SpikeEventPolicyConfig config)
114 throws FileNotFoundException {
115 mapper = new ObjectMapper();
116 LoggerFactory loggerFactoryInstance = LoggerFactory.getInstance();
117 logger = loggerFactoryInstance.getLogger(AbstractSpikeEntityEventProcessor.class.getName());
119 loggerFactoryInstance.getMetricsLogger(AbstractSpikeEntityEventProcessor.class.getName());
121 // Instantiate the agent that we will use for interacting with the Search Service.
122 searchAgent = new SearchServiceAgent(config.getSearchCertName(), config.getSearchKeystore(),
123 config.getSearchKeystorePwd(), AbstractSpikeEntityEventProcessor
124 .concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint()),
125 config.getSearchEndpointDocuments(), logger);
127 this.externalOxmModelProcessors = new ArrayList<>();
128 this.externalOxmModelProcessors.add(EntityOxmReferenceHelper.getInstance());
129 OxmModelLoader.registerExternalOxmModelProcessors(externalOxmModelProcessors);
130 OxmModelLoader.loadModels();
131 oxmVersionContextMap = OxmModelLoader.getVersionContextMap();
132 parseLatestOxmVersion();
135 public String getCreateIndexUrl() {
136 return createIndexUrl;
140 public void setCreateIndexUrl(String createIndexUrl) {
141 this.createIndexUrl = createIndexUrl;
144 public String getSearchIndexName() {
145 return searchIndexName;
149 public void setSearchIndexName(String searchIndexName) {
150 this.searchIndexName = searchIndexName;
153 public String getSearchIndexSchema() {
154 return searchIndexSchema;
158 public void setSearchIndexSchema(String searchIndexSchema) {
159 this.searchIndexSchema = searchIndexSchema;
162 protected void startup() {
167 * Load the UEB JSON payload, any errors would result to a failure case response.
169 protected JSONObject getUebContentAsJson(String payload, String contentKey) {
171 JSONObject uebJsonObj;
172 JSONObject uebObjContent;
175 uebJsonObj = new JSONObject(payload);
176 } catch (JSONException e) {
177 logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
178 logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
182 if (uebJsonObj.has(contentKey)) {
183 uebObjContent = uebJsonObj.getJSONObject(contentKey);
185 logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
186 logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
190 return uebObjContent;
192 public abstract void process(Exchange exchange) throws Exception;
195 private void parseLatestOxmVersion() {
196 int latestVersion = -1;
197 if (oxmVersionContextMap != null) {
198 Iterator it = oxmVersionContextMap.entrySet().iterator();
199 while (it.hasNext()) {
200 Map.Entry pair = (Map.Entry) it.next();
202 String version = pair.getKey().toString();
203 int versionNum = Integer.parseInt(version.substring(1, version.length()));
205 if (versionNum > latestVersion) {
206 latestVersion = versionNum;
207 oxmVersion = pair.getKey().toString();
210 logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
213 logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
220 * This will be used in: updateSearchEntityWithCrossEntityReference not this scope Convert object
223 * @param object the object
224 * @param pretty the pretty
226 * @throws JsonProcessingException the json processing exception
228 * protected static String convertObjectToJson(Object object, boolean pretty) throws
229 * JsonProcessingException { ObjectWriter ow;
231 * if (pretty) { ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
233 * } else { ow = new ObjectMapper().writer(); }
235 * return ow.writeValueAsString(object); }
238 protected void returnWithError(Exchange exchange, String payload, String errorMsg) {
239 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, errorMsg);
240 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, errorMsg, payload);
241 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
244 private boolean isJSONValid(String test) {
246 new JSONObject(test);
247 } catch (JSONException ex) {
255 protected String getSpikeEventAction(Exchange exchange, String uebPayload) {
256 JSONObject mainJson = new JSONObject(uebPayload);
257 String action = mainJson.getString(OPERATION_KEY);
258 if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
259 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
260 "Unrecognized action '" + action + "'", uebPayload);
261 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
262 "Unrecognized action '" + action + "'");
263 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
269 protected String getExchangeBody(Exchange exchange) {
270 String uebPayload = exchange.getIn().getBody().toString();
271 if (uebPayload == null || !isJSONValid(uebPayload)) {
272 uebPayload = exchange.getIn().getBody(String.class);
273 if (uebPayload == null || !isJSONValid(uebPayload)) {
274 returnWithError(exchange, uebPayload, "Invalid Payload");
281 protected SpikeEventVertex populateEventVertex(Exchange exchange, String uebPayload)
284 // Load the UEB payload data, any errors will result in a failure and discard
286 JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
287 if (spikeObjVertex == null) {
288 returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
292 SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
296 protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
297 DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
298 if (oxmJaxbContext == null) {
299 logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
300 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
302 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
305 return oxmJaxbContext;
309 protected String getEntityType(Exchange exchange, SpikeEventVertex eventVertex,
312 String entityType = eventVertex.getType();
313 if (entityType == null || entityType.isEmpty()) {
314 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
315 "Payload header missing entity type", uebPayload);
316 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
317 "Payload header missing entity type");
319 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
327 protected String getEntityLink(Exchange exchange, SpikeEventVertex eventVertex,
329 String entityKey = eventVertex.getKey();
330 if (entityKey == null || entityKey.isEmpty()) {
331 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
333 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
334 "Payload vertex missing entity key");
336 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
339 //EntityLink never can be null if entityKey is not null. no need to check
340 return eventVertex.getEntityLink();
347 * Use the OXM Model to determine the primary key field name based on the entity-type
349 protected SpikeEventEntity populateSpikeEventEntity(Exchange exchange,
350 SpikeEventEntity spikeEventEntity, DynamicJAXBContext oxmJaxbContext, String entityType,
351 String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
353 String entityPrimaryKeyFieldName =
354 getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
355 if (entityPrimaryKeyFieldName == null) {
356 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
357 "Payload missing primary key attribute");
358 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
359 "Payload missing primary key attribute", uebPayload);
360 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
363 String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
364 if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
365 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
366 "Payload missing primary value attribute");
367 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
368 "Payload missing primary value attribute", uebPayload);
370 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
375 if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
376 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
377 "Payload missing searchable attribute for entity type '" + entityType + "'");
378 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
379 "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
381 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
384 spikeEventEntity.setEntityPrimaryKeyName(entityPrimaryKeyFieldName);
385 spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldName);
388 spikeEventEntity.deriveFields();
390 } catch (NoSuchAlgorithmException e) {
391 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
392 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
395 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
398 return spikeEventEntity;
401 protected void setResponse(Exchange exchange, ResponseType responseType, String additionalInfo) {
403 exchange.getOut().setHeader("ResponseType", responseType.toString());
404 exchange.getOut().setBody(additionalInfo);
408 protected String getOxmEntityType(String entityType) {
409 return new OxmEntityTypeConverter().convert(entityType);
412 protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
413 String entityType, String uebPayload,Exchange exchange) {
414 List<String> searchableAttr =
415 getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
416 if (searchableAttr == null) {
417 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
418 "Searchable attribute not found for payload entity type '" + entityType + "'");
419 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
420 "Searchable attribute not found for payload entity type '" + entityType + "'",
423 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
426 return searchableAttr;
430 private SpikeEventVertex initializeSpikeEventVertex(String payload) {
432 SpikeEventVertex eventVertex = null;
433 ObjectMapper mapper = new ObjectMapper();
435 // Make sure that were were actually passed in a valid string.
436 if (payload == null || payload.isEmpty()) {
437 logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
438 logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
443 // Marshal the supplied string into a UebEventHeader object.
445 eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
446 } catch (JsonProcessingException e) {
447 logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
448 } catch (Exception e) {
449 logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
452 if (eventVertex != null) {
453 logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
460 private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
461 String oxmEntityType, String entityType) {
463 DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
464 if (entity == null) {
468 List<DatabaseField> list = entity.getDescriptor().getPrimaryKeyFields();
469 if (list != null && !list.isEmpty()) {
470 String keyName = list.get(0).getName();
471 return keyName.substring(0, keyName.indexOf('/'));
477 private String lookupValueUsingKey(String payload, String key) throws JSONException {
478 JsonNode jsonNode = convertToJsonNode(payload);
479 return RouterServiceUtil.recursivelyLookupJsonPayload(jsonNode, key);
483 private JsonNode convertToJsonNode(String payload) {
485 ObjectMapper mapper = new ObjectMapper();
486 JsonNode jsonNode = null;
488 jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
489 } catch (IOException e) {
490 logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
492 logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
500 private boolean getSearchTags(SpikeEventEntity spikeEventEntity, List<String> searchableAttr,
501 String payload, String action) {
503 boolean hasSearchableAttr = false;
504 for (String searchTagField : searchableAttr) {
505 String searchTagValue;
506 if (searchTagField.equalsIgnoreCase(spikeEventEntity.getEntityPrimaryKeyName())) {
507 searchTagValue = spikeEventEntity.getEntityPrimaryKeyValue();
509 searchTagValue = this.lookupValueUsingKey(payload, searchTagField);
512 if (searchTagValue != null && !searchTagValue.isEmpty()) {
513 hasSearchableAttr = true;
514 spikeEventEntity.addSearchTagWithKey(searchTagValue, searchTagField);
517 return hasSearchableAttr;
521 * Check if OXM version is available. If available, load it.
523 private DynamicJAXBContext loadOxmContext(String version) {
524 if (version == null) {
525 logger.error(EntityEventPolicyMsgs.FAILED_TO_FIND_OXM_VERSION, version);
529 return oxmVersionContextMap.get(version);
532 private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
533 String entityType, String fieldName) {
535 DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
536 if (entity == null) {
541 * Check for searchable XML tag
543 List<String> fieldValues = null;
544 Map<String, String> properties = entity.getDescriptor().getProperties();
545 for (Map.Entry<String, String> entry : properties.entrySet()) {
546 if (entry.getKey().equalsIgnoreCase(fieldName)) {
547 fieldValues = Arrays.asList(entry.getValue().split(","));
555 protected SpikeEventEntity getPopulatedEntity(JsonNode entityNode,
556 OxmEntityDescriptor resultDescriptor) {
557 SpikeEventEntity d = new SpikeEventEntity();
559 d.setEntityType(resultDescriptor.getEntityName());
561 List<String> primaryKeyValues = new ArrayList<>();
562 List<String> primaryKeyNames = new ArrayList<>();
565 for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
566 pkeyValue = RouterServiceUtil.getNodeFieldAsText(entityNode, keyName);
567 if (pkeyValue != null) {
568 primaryKeyValues.add(pkeyValue);
569 primaryKeyNames.add(keyName);
571 // logger.warn("getPopulatedDocument(), pKeyValue is null for entityType = " +
572 // resultDescriptor.getEntityName());
573 logger.error(EntityEventPolicyMsgs.PRIMARY_KEY_NULL_FOR_ENTITY_TYPE,
574 resultDescriptor.getEntityName());
578 final String primaryCompositeKeyValue = RouterServiceUtil.concatArray(primaryKeyValues, "/");
579 d.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
580 final String primaryCompositeKeyName = RouterServiceUtil.concatArray(primaryKeyNames, "/");
581 d.setEntityPrimaryKeyName(primaryCompositeKeyName);
583 final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
586 * Based on configuration, use the configured field names for this entity-Type to build a
587 * multi-value collection of search tags for elastic search entity search criteria.
591 for (String searchTagField : searchTagFields) {
592 String searchTagValue = RouterServiceUtil.getNodeFieldAsText(entityNode, searchTagField);
593 if (searchTagValue != null && !searchTagValue.isEmpty()) {
594 d.addSearchTagWithKey(searchTagValue, searchTagField);
603 // put this here until we find a better spot
605 * Helper utility to concatenate substrings of a URI together to form a proper URI.
607 * @param suburis the list of substrings to concatenate together
608 * @return the concatenated list of substrings
610 private static String concatSubUri(String... suburis) {
611 String finalUri = "";
613 for (String suburi : suburis) {
615 if (suburi != null) {
616 // Remove any leading / since we only want to append /
617 suburi = suburi.replaceFirst("^/*", "");
619 // Add a trailing / if one isn't already there
620 finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
630 * Perform create, read, update or delete (CRUD) operation on search engine's suggestive search
633 * @param eventEntity Entity/data to use in operation
634 * @param action The operation to perform
636 protected void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
640 Map<String, List<String>> headers = new HashMap<>();
641 headers.put(Headers.FROM_APP_ID, Arrays.asList("DataLayer"));
642 headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
644 String entityId = eventEntity.getId();
646 if ((action.equalsIgnoreCase(CREATE) && entityId != null)
647 || action.equalsIgnoreCase(UPDATE)) {
649 // Run the GET to retrieve the ETAG from the search service
650 OperationResult storedEntity = searchAgent.getDocument(index, entityId);
652 if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
653 List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
655 if (etag != null && !etag.isEmpty()) {
656 headers.put(Headers.IF_MATCH, etag);
658 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
662 // Write the entity to the search service.
664 searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
665 } else if (action.equalsIgnoreCase(CREATE)) {
666 // Write the entry to the search service.
667 searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
669 } else if (action.equalsIgnoreCase(DELETE)) {
670 // Run the GET to retrieve the ETAG from the search service
671 OperationResult storedEntity = searchAgent.getDocument(index, entityId);
673 if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
674 List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
676 if (etag != null && !etag.isEmpty()) {
677 headers.put(Headers.IF_MATCH, etag);
679 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
683 * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
684 * we specify a Content-Type.
687 headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
689 searchAgent.deleteDocument(index, eventEntity.getId(), headers);
691 logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
694 logger.error(EntityEventPolicyMsgs.ENTITY_OPERATION_NOT_SUPPORTED, action);
696 } catch (IOException e) {
697 logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, eventEntity.getId(),
702 protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
704 SpikeEventMeta meta = new SpikeEventMeta();
705 Object eventPayloadObj = null;
706 String eventPayload = null;
708 eventPayloadObj = exchange.getIn().getBody();
711 * It is expected that mainJson will have multiple top level objects: - header - body - result
713 if (eventPayloadObj == null) {
714 returnWithError(exchange, null, "Invalid Payload");
718 eventPayload = (String)eventPayloadObj;
720 meta.setEventEntity(new JSONObject(eventPayload));
721 } catch (JSONException exc) {
722 returnWithError(exchange, eventPayload, "Invalid Payload");
726 JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
728 if (eventHeader == null) {
729 returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
733 meta.setEventHeader(eventHeader);
736 * Only process SPIKE update-notification events
739 final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
740 final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
742 if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
744 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
745 + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
746 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
747 "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
750 * I don't think ignoring a non-applicable event constitutes a failure.
753 setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
757 JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
759 if (eventBody == null) {
760 returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
764 meta.setEventBody(eventBody);
766 String action = eventBody.getString(OPERATION_KEY);
767 if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
768 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
769 "Unrecognized action '" + action + "'", eventPayload);
770 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
771 "Unrecognized action '" + action + "'");
772 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
775 meta.setBodyOperationType(action);
777 // Load the event body data, any errors will result in a failure and discard
779 JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
780 if (spikeVertex == null) {
781 returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
785 meta.setSpikeVertex(spikeVertex);
787 SpikeEventVertex spikeEventVertex = null;
789 spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
790 } catch (JSONException exc) {
791 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
792 "Error initializating spike event. Error: " + exc.getMessage(), eventPayload);
793 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
794 "Error initializating spike event. Error: " + exc.getMessage());
796 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
800 meta.setSpikeEventVertex(spikeEventVertex);
802 DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
803 if (oxmJaxbContext == null) {
804 logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
805 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
808 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
812 meta.setOxmJaxbContext(oxmJaxbContext);
814 String entityType = spikeEventVertex.getType();
815 if (entityType == null || entityType.isEmpty()) {
816 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
817 "Payload header missing entity type", eventPayload);
818 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
819 "Payload header missing entity type");
821 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
826 * test if entityType is in the model
829 VersionedOxmEntities oxmEntities =
830 EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
832 if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
833 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
835 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
836 "No matching OXM Descriptor for entity-type='" + entityType + "'");
838 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
843 String entityKey = spikeEventVertex.getKey();
844 if (entityKey == null || entityKey.isEmpty()) {
845 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
847 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
848 "Payload vertex missing entity key");
850 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
853 String entityLink = spikeEventVertex.getEntityLink();
854 if (entityLink == null || entityLink.isEmpty()) {
855 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
856 "Payload header missing entity link", eventPayload);
857 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
858 "Payload header missing entity link");
860 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
864 JSONObject vertexProperties = null;
867 vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
869 } catch (JSONException exc) {
870 logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
871 "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
872 logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
873 "Payload header missing " + VERTEX_PROPERTIES_KEY);
875 setResponse(exchange, ResponseType.FAILURE, additionalInfo);
879 meta.setVertexProperties(vertexProperties);
881 // log the fact that all data are in good shape
882 logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
883 logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
890 protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
893 * These are all critical keys
896 final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
897 final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
898 final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
900 SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
901 eventVertex.setSchemaVersion(vertexSchemaVersion);
902 logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());