Prefer default field if available
[aai/data-router.git] / src / main / java / org / onap / aai / datarouter / policy / AbstractSpikeEntityEventProcessor.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.datarouter.policy;
22
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33
34 import org.apache.camel.Exchange;
35 import org.apache.camel.Processor;
36 import org.eclipse.persistence.dynamic.DynamicType;
37 import org.eclipse.persistence.internal.helper.DatabaseField;
38 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
39 import org.eclipse.persistence.oxm.MediaType;
40 import org.json.JSONException;
41 import org.json.JSONObject;
42 import org.onap.aai.cl.api.Logger;
43 import org.onap.aai.cl.eelf.LoggerFactory;
44 import org.onap.aai.cl.mdc.MdcContext;
45 import org.onap.aai.datarouter.util.DataRouterProperties;
46 import org.onap.aai.schema.OxmModelLoader;
47 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
48 import org.onap.aai.datarouter.entity.SpikeEventEntity;
49 import org.onap.aai.datarouter.entity.SpikeEventMeta;
50 import org.onap.aai.datarouter.entity.SpikeEventVertex;
51 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
52 import org.onap.aai.datarouter.util.RouterServiceUtil;
53 import org.onap.aai.datarouter.util.SearchServiceAgent;
54 import org.onap.aai.entity.OxmEntityDescriptor;
55 import org.onap.aai.util.EntityOxmReferenceHelper;
56 import org.onap.aai.util.ExternalOxmModelProcessor;
57 import org.onap.aai.util.Version;
58 import org.onap.aai.util.VersionedOxmEntities;
59 import org.onap.aai.restclient.client.Headers;
60 import org.onap.aai.restclient.client.OperationResult;
61 import org.onap.aai.restclient.rest.HttpUtil;
62 import org.onap.aai.setup.SchemaVersions;
63 import org.slf4j.MDC;
64
65 import com.fasterxml.jackson.core.JsonProcessingException;
66 import com.fasterxml.jackson.databind.JsonNode;
67 import com.fasterxml.jackson.databind.ObjectMapper;
68
69 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
70
71   protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
72   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
73
74   protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
75
76   protected static final String UPDATE_NOTIFICATION = "update-notification";
77   protected static final String SPIKE = "SPIKE";
78   
79   protected static final String HEADER_KEY = "header";
80   protected static final String EVENT_TYPE_KEY = "event-type";
81   protected static final String SOURCE_NAME_KEY = "source-name";
82   protected static final String BODY_KEY = "body";
83   protected static final String OPERATION_KEY = "operation";
84   
85   protected static final String EDGE_KEY = "relationship";
86   
87   protected static final String VERTEX_KEY = "vertex";
88   protected static final String VERTEX_PROPERTIES_KEY = "properties";
89   protected static final String VERTEX_KEY_KEY = "key";
90   protected static final String VERTEX_TYPE_KEY = "type";
91   protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
92   
93   protected static final String CREATE = "create";
94   protected static final String DELETE = "delete";
95   protected static final String UPDATE = "update";
96   
97   protected enum ResponseType {
98     SUCCESS, PARTIAL_SUCCESS, FAILURE;
99   }
100
101   protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
102
103   protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
104   protected String oxmVersion = null;
105
106   /** Agent for communicating with the Search Service. */
107   protected SearchServiceAgent searchAgent = null;
108   protected String searchIndexName;
109   protected String searchIndexSchema;
110   protected String createIndexUrl;
111
112   protected Logger logger;
113   protected Logger metricsLogger;
114   protected ObjectMapper mapper;
115   
116   public AbstractSpikeEntityEventProcessor(SpikeEventPolicyConfig config)
117       throws FileNotFoundException {
118     mapper = new ObjectMapper();
119     LoggerFactory loggerFactoryInstance = LoggerFactory.getInstance();
120     logger = loggerFactoryInstance.getLogger(AbstractSpikeEntityEventProcessor.class.getName());
121     metricsLogger =
122         loggerFactoryInstance.getMetricsLogger(AbstractSpikeEntityEventProcessor.class.getName());
123
124     // Instantiate the agent that we will use for interacting with the Search Service.
125     searchAgent = new SearchServiceAgent(config.getSearchCertName(), config.getSearchKeystore(),
126         config.getSearchKeystorePwd(), AbstractSpikeEntityEventProcessor
127             .concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint()),
128         config.getSearchEndpointDocuments(), logger);
129
130     this.externalOxmModelProcessors = new ArrayList<>();
131     this.externalOxmModelProcessors.add(EntityOxmReferenceHelper.getInstance());
132     OxmModelLoader.registerExternalOxmModelProcessors(externalOxmModelProcessors);
133     OxmModelLoader.loadModels(config.getSchemaVersions(), config.getSchemaLocationsBean());
134     oxmVersionContextMap = OxmModelLoader.getVersionContextMap();
135     parseLatestOxmVersion();
136   }
137
138   public String getCreateIndexUrl() {
139     return createIndexUrl;
140   }
141
142
143   public void setCreateIndexUrl(String createIndexUrl) {
144     this.createIndexUrl = createIndexUrl;
145   }
146   
147   public String getSearchIndexName() {
148     return searchIndexName;
149   }
150
151
152   public void setSearchIndexName(String searchIndexName) {
153     this.searchIndexName = searchIndexName;
154   }
155
156   public String getSearchIndexSchema() {
157     return searchIndexSchema;
158   }
159
160
161   public void setSearchIndexSchema(String searchIndexSchema) {
162     this.searchIndexSchema = searchIndexSchema;
163   }
164
165   protected void startup() {
166
167   }
168
169   /*
170    * Load the UEB JSON payload, any errors would result to a failure case response.
171    */
172   protected JSONObject getUebContentAsJson(String payload, String contentKey) {
173
174     JSONObject uebJsonObj;
175     JSONObject uebObjContent;
176
177     try {
178       uebJsonObj = new JSONObject(payload);
179     } catch (JSONException e) {
180       logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
181       logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
182       return null;
183     }
184
185     if (uebJsonObj.has(contentKey)) {
186       uebObjContent = uebJsonObj.getJSONObject(contentKey);
187     } else {
188       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
189       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
190       return null;
191     }
192
193     return uebObjContent;
194   }
195   public abstract void process(Exchange exchange) throws Exception;
196
197
198   private void parseLatestOxmVersion() {
199     int latestVersion = -1;
200     if (oxmVersionContextMap != null) {
201       Iterator it = oxmVersionContextMap.entrySet().iterator();
202       while (it.hasNext()) {
203         Map.Entry pair = (Map.Entry) it.next();
204
205         String version = pair.getKey().toString();
206         int versionNum = Integer.parseInt(version.substring(1, version.length()));
207
208         if (versionNum > latestVersion) {
209           latestVersion = versionNum;
210           oxmVersion = pair.getKey().toString();
211         }
212
213         logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
214       }
215     } else {
216       logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
217     }
218   }
219
220
221
222   /**
223    * This will be used in: updateSearchEntityWithCrossEntityReference not this scope Convert object
224    * to json.
225    *
226    * @param object the object
227    * @param pretty the pretty
228    * @return the string
229    * @throws JsonProcessingException the json processing exception
230    * 
231    *         protected static String convertObjectToJson(Object object, boolean pretty) throws
232    *         JsonProcessingException { ObjectWriter ow;
233    * 
234    *         if (pretty) { ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
235    * 
236    *         } else { ow = new ObjectMapper().writer(); }
237    * 
238    *         return ow.writeValueAsString(object); }
239    */
240
241   protected void returnWithError(Exchange exchange, String payload, String errorMsg) {
242     logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, errorMsg);
243     logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, errorMsg, payload);
244     setResponse(exchange, ResponseType.FAILURE, additionalInfo);
245   }
246
247   private boolean isJSONValid(String test) {
248     try {
249       new JSONObject(test);
250     } catch (JSONException ex) {
251       return false;
252     }
253     return true;
254   }
255
256
257
258   protected String getSpikeEventAction(Exchange exchange, String uebPayload) {
259     JSONObject mainJson = new JSONObject(uebPayload);
260     String action = mainJson.getString(OPERATION_KEY);
261     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
262       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
263           "Unrecognized action '" + action + "'", uebPayload);
264       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
265           "Unrecognized action '" + action + "'");
266       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
267       return null;
268     }
269     return action;
270   }
271
272   protected String getExchangeBody(Exchange exchange) {
273     String uebPayload = exchange.getIn().getBody().toString();
274     if (uebPayload == null || !isJSONValid(uebPayload)) {
275       uebPayload = exchange.getIn().getBody(String.class);
276       if (uebPayload == null || !isJSONValid(uebPayload)) {
277         returnWithError(exchange, uebPayload, "Invalid Payload");
278         return null;
279       }
280     }
281     return uebPayload;
282   }
283
284   protected SpikeEventVertex populateEventVertex(Exchange exchange, String uebPayload)
285       throws Exception {
286
287     // Load the UEB payload data, any errors will result in a failure and discard
288
289     JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
290     if (spikeObjVertex == null) {
291       returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
292       return null;
293     }
294
295     SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
296     return eventVertex;
297   }
298
299   protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
300     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
301     if (oxmJaxbContext == null) {
302       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
303       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
304
305       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
306       return null;
307     }
308     return oxmJaxbContext;
309   }
310
311
312   protected String getEntityType(Exchange exchange, SpikeEventVertex eventVertex,
313       String uebPayload) {
314
315     String entityType = eventVertex.getType();
316     if (entityType == null || entityType.isEmpty()) {
317       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
318           "Payload header missing entity type", uebPayload);
319       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
320           "Payload header missing entity type");
321
322       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
323       return null;
324     }
325     return entityType;
326   }
327
328
329
330   protected String getEntityLink(Exchange exchange, SpikeEventVertex eventVertex,
331       String uebPayload) {
332     String entityKey = eventVertex.getKey();
333     if (entityKey == null || entityKey.isEmpty()) {
334       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
335           uebPayload);
336       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
337           "Payload vertex missing entity key");
338
339       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
340       return null;
341     }
342     //EntityLink never can be null if  entityKey is not null. no need to check
343     return eventVertex.getEntityLink();
344
345   }
346
347
348
349   /*
350    * Use the OXM Model to determine the primary key field name based on the entity-type
351    */
352   protected SpikeEventEntity populateSpikeEventEntity(Exchange exchange,
353       SpikeEventEntity spikeEventEntity, DynamicJAXBContext oxmJaxbContext, String entityType,
354       String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
355      
356     String entityPrimaryKeyFieldName =
357         getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
358     if (entityPrimaryKeyFieldName == null) {
359       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
360           "Payload missing primary key attribute");
361       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
362           "Payload missing primary key attribute", uebPayload);
363       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
364       return null;
365     }
366     String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
367     if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
368       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
369           "Payload missing primary value attribute");
370       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
371           "Payload missing primary value attribute", uebPayload);
372
373       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
374       return null;
375     }
376
377
378     if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
379       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
380           "Payload missing searchable attribute for entity type '" + entityType + "'");
381       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
382           "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
383
384       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
385       return null;
386     }
387     spikeEventEntity.setEntityPrimaryKeyName(entityPrimaryKeyFieldName);
388     spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldName);
389
390     try {
391       spikeEventEntity.deriveFields();
392
393     } catch (NoSuchAlgorithmException e) {
394       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
395       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
396           uebPayload);
397
398       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
399       return null;
400     }
401     return spikeEventEntity;
402   }
403
404   protected void setResponse(Exchange exchange, ResponseType responseType, String additionalInfo) {
405
406     exchange.getOut().setHeader("ResponseType", responseType.toString());
407     exchange.getOut().setBody(additionalInfo);
408   }
409
410
411  protected String getOxmEntityType(String entityType) {
412    return new OxmEntityTypeConverter().convert(entityType);
413  }
414  
415  protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
416      String entityType, String uebPayload,Exchange exchange) {
417    List<String> searchableAttr =
418        getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
419    if (searchableAttr == null) {
420      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
421          "Searchable attribute not found for payload entity type '" + entityType + "'");
422      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
423          "Searchable attribute not found for payload entity type '" + entityType + "'",
424          uebPayload);
425
426      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
427      return null;
428    }
429    return searchableAttr;
430  }
431
432
433   private SpikeEventVertex initializeSpikeEventVertex(String payload) {
434
435     SpikeEventVertex eventVertex = null;
436     ObjectMapper mapper = new ObjectMapper();
437
438     // Make sure that were were actually passed in a valid string.
439     if (payload == null || payload.isEmpty()) {
440       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
441       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
442
443       return eventVertex;
444     }
445
446     // Marshal the supplied string into a UebEventHeader object.
447     try {
448       eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
449     } catch (JsonProcessingException e) {
450       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
451     } catch (Exception e) {
452       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
453     }
454
455     if (eventVertex != null) {
456       logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
457     }
458
459     return eventVertex;
460   }
461
462
463   private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
464       String oxmEntityType, String entityType) {
465     String defaultKey = DataRouterProperties.get("entity.primary.field");
466     if (defaultKey != null) {
467       return defaultKey;
468     }
469
470     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
471     if (entity == null) {
472       return null;
473     }
474
475     List<DatabaseField> list = entity.getDescriptor().getPrimaryKeyFields();
476     if (list != null && !list.isEmpty()) {
477       String keyName = list.get(0).getName();
478       return keyName.substring(0, keyName.indexOf('/'));
479     }
480
481     return "";
482   }
483
484   private String lookupValueUsingKey(String payload, String key) throws JSONException {
485     JsonNode jsonNode = convertToJsonNode(payload);
486     return RouterServiceUtil.recursivelyLookupJsonPayload(jsonNode, key);
487   }
488
489
490   private JsonNode convertToJsonNode(String payload) {
491
492     ObjectMapper mapper = new ObjectMapper();
493     JsonNode jsonNode = null;
494     try {
495       jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
496     } catch (IOException e) {
497       logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
498           payload);
499       logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
500           "");
501     }
502
503     return jsonNode;
504   }
505
506
507   private boolean getSearchTags(SpikeEventEntity spikeEventEntity, List<String> searchableAttr,
508       String payload, String action) {
509
510     boolean hasSearchableAttr = false;
511     for (String searchTagField : searchableAttr) {
512       String searchTagValue;
513       if (searchTagField.equalsIgnoreCase(spikeEventEntity.getEntityPrimaryKeyName())) {
514         searchTagValue = spikeEventEntity.getEntityPrimaryKeyValue();
515       } else {
516         searchTagValue = this.lookupValueUsingKey(payload, searchTagField);
517       }
518
519       if (searchTagValue != null && !searchTagValue.isEmpty()) {
520         hasSearchableAttr = true;
521         spikeEventEntity.addSearchTagWithKey(searchTagValue, searchTagField);
522       }
523     }
524     return hasSearchableAttr;
525   }
526
527   /*
528    * Check if OXM version is available. If available, load it.
529    */
530   private DynamicJAXBContext loadOxmContext(String version) {
531     if (version == null) {
532       logger.error(EntityEventPolicyMsgs.FAILED_TO_FIND_OXM_VERSION, version);
533       return null;
534     }
535
536     return oxmVersionContextMap.get(version);
537   }
538
539   private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
540       String entityType, String fieldName) {
541
542     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
543     if (entity == null) {
544       return null;
545     }
546
547     /*
548      * Check for searchable XML tag
549      */
550     List<String> fieldValues = null;
551     Map<String, String> properties = entity.getDescriptor().getProperties();
552     for (Map.Entry<String, String> entry : properties.entrySet()) {
553       if (entry.getKey().equalsIgnoreCase(fieldName)) {
554         fieldValues = Arrays.asList(entry.getValue().split(","));
555         break;
556       }
557     }
558
559     return fieldValues;
560   }
561
562   protected SpikeEventEntity getPopulatedEntity(JsonNode entityNode,
563       OxmEntityDescriptor resultDescriptor) {
564     SpikeEventEntity d = new SpikeEventEntity();
565
566     d.setEntityType(resultDescriptor.getEntityName());
567
568     List<String> primaryKeyValues = new ArrayList<>();
569     List<String> primaryKeyNames = new ArrayList<>();
570     String pkeyValue;
571
572     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
573       pkeyValue = RouterServiceUtil.getNodeFieldAsText(entityNode, keyName);
574       if (pkeyValue != null) {
575         primaryKeyValues.add(pkeyValue);
576         primaryKeyNames.add(keyName);
577       } else {
578         // logger.warn("getPopulatedDocument(), pKeyValue is null for entityType = " +
579         // resultDescriptor.getEntityName());
580         logger.error(EntityEventPolicyMsgs.PRIMARY_KEY_NULL_FOR_ENTITY_TYPE,
581             resultDescriptor.getEntityName());
582       }
583     }
584
585     final String primaryCompositeKeyValue = RouterServiceUtil.concatArray(primaryKeyValues, "/");
586     d.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
587     final String primaryCompositeKeyName = RouterServiceUtil.concatArray(primaryKeyNames, "/");
588     d.setEntityPrimaryKeyName(primaryCompositeKeyName);
589
590     final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
591
592     /*
593      * Based on configuration, use the configured field names for this entity-Type to build a
594      * multi-value collection of search tags for elastic search entity search criteria.
595      */
596
597
598     for (String searchTagField : searchTagFields) {
599       String searchTagValue = RouterServiceUtil.getNodeFieldAsText(entityNode, searchTagField);
600       if (searchTagValue != null && !searchTagValue.isEmpty()) {
601         d.addSearchTagWithKey(searchTagValue, searchTagField);
602       }
603     }
604
605     return d;
606   }
607
608
609
610   // put this here until we find a better spot
611   /**
612    * Helper utility to concatenate substrings of a URI together to form a proper URI.
613    * 
614    * @param suburis the list of substrings to concatenate together
615    * @return the concatenated list of substrings
616    */
617   private static String concatSubUri(String... suburis) {
618     String finalUri = "";
619
620     for (String suburi : suburis) {
621
622       if (suburi != null) {
623         // Remove any leading / since we only want to append /
624         suburi = suburi.replaceFirst("^/*", "");
625
626         // Add a trailing / if one isn't already there
627         finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
628       }
629     }
630
631     return finalUri;
632   }
633
634
635
636   /**
637    * Perform create, read, update or delete (CRUD) operation on search engine's suggestive search
638    * index
639    * 
640    * @param eventEntity Entity/data to use in operation
641    * @param action The operation to perform
642    */
643   protected void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
644       String index) {
645     try {
646
647       Map<String, List<String>> headers = new HashMap<>();
648       headers.put(Headers.FROM_APP_ID, Arrays.asList("DataLayer"));
649       headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
650
651       String entityId = eventEntity.getId();
652
653       if ((action.equalsIgnoreCase(CREATE) && entityId != null)
654           || action.equalsIgnoreCase(UPDATE)) {
655
656         // Run the GET to retrieve the ETAG from the search service
657         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
658
659         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
660           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
661
662           if (etag != null && !etag.isEmpty()) {
663             headers.put(Headers.IF_MATCH, etag);
664           } else {
665             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
666           }
667         }
668
669         // Write the entity to the search service.
670         // PUT
671         searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
672       } else if (action.equalsIgnoreCase(CREATE)) {
673         // Write the entry to the search service.
674         searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
675
676       } else if (action.equalsIgnoreCase(DELETE)) {
677         // Run the GET to retrieve the ETAG from the search service
678         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
679
680         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
681           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
682
683           if (etag != null && !etag.isEmpty()) {
684             headers.put(Headers.IF_MATCH, etag);
685           } else {
686             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
687           }
688
689           /*
690            * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
691            * we specify a Content-Type.
692            */
693
694           headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
695
696           searchAgent.deleteDocument(index, eventEntity.getId(), headers);
697         } else {
698           logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
699         }
700       } else {
701         logger.error(EntityEventPolicyMsgs.ENTITY_OPERATION_NOT_SUPPORTED, action);
702       }
703     } catch (IOException e) {
704       logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, eventEntity.getId(),
705           action);
706     }
707   }
708   
709   protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
710
711     SpikeEventMeta meta = new SpikeEventMeta();
712     Object eventPayloadObj = null;
713     String eventPayload = null;
714     try {
715        eventPayloadObj = exchange.getIn().getBody();    
716       
717       /*
718        * It is expected that mainJson will have multiple top level objects: - header - body - result
719        */
720       if (eventPayloadObj == null) {
721         returnWithError(exchange, null, "Invalid Payload");
722         return null;
723       }
724       
725       eventPayload = (String)eventPayloadObj;
726        
727       meta.setEventEntity(new JSONObject(eventPayload));
728     } catch (JSONException exc) {
729       returnWithError(exchange, eventPayload, "Invalid Payload");
730       return null;
731     }
732
733     JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
734
735     if (eventHeader == null) {
736       returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
737       return null;
738     }
739
740     meta.setEventHeader(eventHeader);
741
742     /*
743      * Only process SPIKE update-notification events
744      */
745
746     final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
747     final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
748
749     if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
750       // drop event
751       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
752           + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
753       logger.info(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
754           "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
755
756       /*
757        * I don't think ignoring a non-applicable event constitutes a failure.
758        */
759
760       setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
761       return null;
762     }
763
764     JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
765
766     if (eventBody == null) {
767       returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
768       return null;
769     }
770
771     meta.setEventBody(eventBody);
772
773     String action = eventBody.getString(OPERATION_KEY);
774     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
775       returnWithError(exchange, eventPayload, "Unrecognized action '" + action + "'");
776       return null;
777     }
778     meta.setBodyOperationType(action);
779
780     
781     /*
782      * Ignore spike events for edges
783      */
784     if (eventBody.has(EDGE_KEY)){
785       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event for edge with sourceName='"
786           + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
787       logger.info(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
788           "Ignoring event for edge with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
789       setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
790       return null;      
791     }
792
793     
794     // Load the event body data, any errors will result in a failure and discard
795     JSONObject spikeVertex = null;
796     try {
797       spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
798     } catch (JSONException exc) {
799       returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
800       return null;
801     }    
802
803     meta.setSpikeVertex(spikeVertex);
804
805     SpikeEventVertex spikeEventVertex = null;
806     try {
807       spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
808     } catch (JSONException exc) {
809       returnWithError(exchange, eventPayload, "Error initializating spike event.  Error: " + exc.getMessage());
810       return null;
811     }
812
813     meta.setSpikeEventVertex(spikeEventVertex);
814
815     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
816     if (oxmJaxbContext == null) {
817       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
818       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
819           eventPayload);
820
821       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
822       return null;
823     }
824
825     meta.setOxmJaxbContext(oxmJaxbContext);
826
827     String entityType = spikeEventVertex.getType();
828     if (entityType == null || entityType.isEmpty()) {
829       returnWithError(exchange, eventPayload, "Payload vertex missing entity type");
830       return null;
831     }
832     
833     /*
834      * test if entityType is in the model
835      */
836
837     VersionedOxmEntities oxmEntities =
838     EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
839     
840     if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
841       returnWithError(exchange, eventPayload, "No matching OXM Descriptor for entity-type='" + entityType + "'");
842       return null;
843     }
844
845     
846     String entityKey = spikeEventVertex.getKey();
847     if (entityKey == null || entityKey.isEmpty()) {
848       returnWithError(exchange, eventPayload, "Payload vertex missing entity key");
849       return null;
850     }
851
852     String entityLink = spikeEventVertex.getEntityLink();
853     if (entityLink == null || entityLink.isEmpty()) {
854       returnWithError(exchange, eventPayload, "Payload vertex missing entity link");
855       return null;
856     }
857
858     JSONObject vertexProperties = null;
859     try {
860       vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
861     } catch (JSONException exc) {
862       returnWithError(exchange, eventPayload, "Payload vertex missing " + VERTEX_PROPERTIES_KEY);
863       return null;
864     }
865
866     meta.setVertexProperties(vertexProperties);
867
868     // log the fact that all data are in good shape
869     logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
870     logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
871         eventPayload);
872
873     return meta;
874
875   }
876   
877   protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
878
879     /*
880      * These are all critical keys
881      */
882
883     final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
884     final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
885     final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
886
887     SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
888     eventVertex.setSchemaVersion(vertexSchemaVersion);
889     logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
890
891     return eventVertex;
892
893   }
894   
895 }