9f96a22cf9c00a33c56de8e76297db07f2d3db1b
[aai/data-router.git] / src / main / java / org / onap / aai / datarouter / policy / AbstractSpikeEntityEventProcessor.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.datarouter.policy;
22
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33
34 import org.apache.camel.Exchange;
35 import org.apache.camel.Processor;
36 import org.eclipse.persistence.dynamic.DynamicType;
37 import org.eclipse.persistence.internal.helper.DatabaseField;
38 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
39 import org.eclipse.persistence.oxm.MediaType;
40 import org.json.JSONException;
41 import org.json.JSONObject;
42 import org.onap.aai.cl.api.Logger;
43 import org.onap.aai.cl.eelf.LoggerFactory;
44 import org.onap.aai.cl.mdc.MdcContext;
45 import org.onap.aai.schema.OxmModelLoader;
46 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
47 import org.onap.aai.datarouter.entity.SpikeEventEntity;
48 import org.onap.aai.datarouter.entity.SpikeEventMeta;
49 import org.onap.aai.datarouter.entity.SpikeEventVertex;
50 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
51 import org.onap.aai.datarouter.util.RouterServiceUtil;
52 import org.onap.aai.datarouter.util.SearchServiceAgent;
53 import org.onap.aai.entity.OxmEntityDescriptor;
54 import org.onap.aai.util.EntityOxmReferenceHelper;
55 import org.onap.aai.util.ExternalOxmModelProcessor;
56 import org.onap.aai.util.Version;
57 import org.onap.aai.util.VersionedOxmEntities;
58 import org.onap.aai.restclient.client.Headers;
59 import org.onap.aai.restclient.client.OperationResult;
60 import org.onap.aai.restclient.rest.HttpUtil;
61 import org.onap.aai.setup.SchemaVersions;
62 import org.slf4j.MDC;
63
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.JsonNode;
66 import com.fasterxml.jackson.databind.ObjectMapper;
67
68 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
69
70   protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
71   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
72
73   protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
74
75   protected static final String UPDATE_NOTIFICATION = "update-notification";
76   protected static final String SPIKE = "SPIKE";
77   
78   protected static final String HEADER_KEY = "header";
79   protected static final String EVENT_TYPE_KEY = "event-type";
80   protected static final String SOURCE_NAME_KEY = "source-name";
81   protected static final String BODY_KEY = "body";
82   protected static final String OPERATION_KEY = "operation";
83   
84   protected static final String VERTEX_KEY = "vertex";
85   protected static final String VERTEX_PROPERTIES_KEY = "properties";
86   protected static final String VERTEX_KEY_KEY = "key";
87   protected static final String VERTEX_TYPE_KEY = "type";
88   protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
89   
90   protected static final String CREATE = "create";
91   protected static final String DELETE = "delete";
92   protected static final String UPDATE = "update";
93   
94   protected enum ResponseType {
95     SUCCESS, PARTIAL_SUCCESS, FAILURE;
96   }
97
98   protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
99
100   protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
101   protected String oxmVersion = null;
102
103   /** Agent for communicating with the Search Service. */
104   protected SearchServiceAgent searchAgent = null;
105   protected String searchIndexName;
106   protected String searchIndexSchema;
107   protected String createIndexUrl;
108
109   protected Logger logger;
110   protected Logger metricsLogger;
111   protected ObjectMapper mapper;
112   
113   public AbstractSpikeEntityEventProcessor(SpikeEventPolicyConfig config)
114       throws FileNotFoundException {
115     mapper = new ObjectMapper();
116     LoggerFactory loggerFactoryInstance = LoggerFactory.getInstance();
117     logger = loggerFactoryInstance.getLogger(AbstractSpikeEntityEventProcessor.class.getName());
118     metricsLogger =
119         loggerFactoryInstance.getMetricsLogger(AbstractSpikeEntityEventProcessor.class.getName());
120
121     // Instantiate the agent that we will use for interacting with the Search Service.
122     searchAgent = new SearchServiceAgent(config.getSearchCertName(), config.getSearchKeystore(),
123         config.getSearchKeystorePwd(), AbstractSpikeEntityEventProcessor
124             .concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint()),
125         config.getSearchEndpointDocuments(), logger);
126
127     this.externalOxmModelProcessors = new ArrayList<>();
128     this.externalOxmModelProcessors.add(EntityOxmReferenceHelper.getInstance());
129     OxmModelLoader.registerExternalOxmModelProcessors(externalOxmModelProcessors);
130     OxmModelLoader.loadModels(config.getSchemaVersions(), config.getSchemaLocationsBean());
131     oxmVersionContextMap = OxmModelLoader.getVersionContextMap();
132     parseLatestOxmVersion();
133   }
134
135   public String getCreateIndexUrl() {
136     return createIndexUrl;
137   }
138
139
140   public void setCreateIndexUrl(String createIndexUrl) {
141     this.createIndexUrl = createIndexUrl;
142   }
143   
144   public String getSearchIndexName() {
145     return searchIndexName;
146   }
147
148
149   public void setSearchIndexName(String searchIndexName) {
150     this.searchIndexName = searchIndexName;
151   }
152
153   public String getSearchIndexSchema() {
154     return searchIndexSchema;
155   }
156
157
158   public void setSearchIndexSchema(String searchIndexSchema) {
159     this.searchIndexSchema = searchIndexSchema;
160   }
161
162   protected void startup() {
163
164   }
165
166   /*
167    * Load the UEB JSON payload, any errors would result to a failure case response.
168    */
169   protected JSONObject getUebContentAsJson(String payload, String contentKey) {
170
171     JSONObject uebJsonObj;
172     JSONObject uebObjContent;
173
174     try {
175       uebJsonObj = new JSONObject(payload);
176     } catch (JSONException e) {
177       logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
178       logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
179       return null;
180     }
181
182     if (uebJsonObj.has(contentKey)) {
183       uebObjContent = uebJsonObj.getJSONObject(contentKey);
184     } else {
185       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
186       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
187       return null;
188     }
189
190     return uebObjContent;
191   }
192   public abstract void process(Exchange exchange) throws Exception;
193
194
195   private void parseLatestOxmVersion() {
196     int latestVersion = -1;
197     if (oxmVersionContextMap != null) {
198       Iterator it = oxmVersionContextMap.entrySet().iterator();
199       while (it.hasNext()) {
200         Map.Entry pair = (Map.Entry) it.next();
201
202         String version = pair.getKey().toString();
203         int versionNum = Integer.parseInt(version.substring(1, version.length()));
204
205         if (versionNum > latestVersion) {
206           latestVersion = versionNum;
207           oxmVersion = pair.getKey().toString();
208         }
209
210         logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
211       }
212     } else {
213       logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
214     }
215   }
216
217
218
219   /**
220    * This will be used in: updateSearchEntityWithCrossEntityReference not this scope Convert object
221    * to json.
222    *
223    * @param object the object
224    * @param pretty the pretty
225    * @return the string
226    * @throws JsonProcessingException the json processing exception
227    * 
228    *         protected static String convertObjectToJson(Object object, boolean pretty) throws
229    *         JsonProcessingException { ObjectWriter ow;
230    * 
231    *         if (pretty) { ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
232    * 
233    *         } else { ow = new ObjectMapper().writer(); }
234    * 
235    *         return ow.writeValueAsString(object); }
236    */
237
238   protected void returnWithError(Exchange exchange, String payload, String errorMsg) {
239     logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, errorMsg);
240     logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, errorMsg, payload);
241     setResponse(exchange, ResponseType.FAILURE, additionalInfo);
242   }
243
244   private boolean isJSONValid(String test) {
245     try {
246       new JSONObject(test);
247     } catch (JSONException ex) {
248       return false;
249     }
250     return true;
251   }
252
253
254
255   protected String getSpikeEventAction(Exchange exchange, String uebPayload) {
256     JSONObject mainJson = new JSONObject(uebPayload);
257     String action = mainJson.getString(OPERATION_KEY);
258     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
259       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
260           "Unrecognized action '" + action + "'", uebPayload);
261       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
262           "Unrecognized action '" + action + "'");
263       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
264       return null;
265     }
266     return action;
267   }
268
269   protected String getExchangeBody(Exchange exchange) {
270     String uebPayload = exchange.getIn().getBody().toString();
271     if (uebPayload == null || !isJSONValid(uebPayload)) {
272       uebPayload = exchange.getIn().getBody(String.class);
273       if (uebPayload == null || !isJSONValid(uebPayload)) {
274         returnWithError(exchange, uebPayload, "Invalid Payload");
275         return null;
276       }
277     }
278     return uebPayload;
279   }
280
281   protected SpikeEventVertex populateEventVertex(Exchange exchange, String uebPayload)
282       throws Exception {
283
284     // Load the UEB payload data, any errors will result in a failure and discard
285
286     JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
287     if (spikeObjVertex == null) {
288       returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
289       return null;
290     }
291
292     SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
293     return eventVertex;
294   }
295
296   protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
297     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
298     if (oxmJaxbContext == null) {
299       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
300       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
301
302       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
303       return null;
304     }
305     return oxmJaxbContext;
306   }
307
308
309   protected String getEntityType(Exchange exchange, SpikeEventVertex eventVertex,
310       String uebPayload) {
311
312     String entityType = eventVertex.getType();
313     if (entityType == null || entityType.isEmpty()) {
314       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
315           "Payload header missing entity type", uebPayload);
316       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
317           "Payload header missing entity type");
318
319       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
320       return null;
321     }
322     return entityType;
323   }
324
325
326
327   protected String getEntityLink(Exchange exchange, SpikeEventVertex eventVertex,
328       String uebPayload) {
329     String entityKey = eventVertex.getKey();
330     if (entityKey == null || entityKey.isEmpty()) {
331       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
332           uebPayload);
333       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
334           "Payload vertex missing entity key");
335
336       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
337       return null;
338     }
339     //EntityLink never can be null if  entityKey is not null. no need to check
340     return eventVertex.getEntityLink();
341
342   }
343
344
345
346   /*
347    * Use the OXM Model to determine the primary key field name based on the entity-type
348    */
349   protected SpikeEventEntity populateSpikeEventEntity(Exchange exchange,
350       SpikeEventEntity spikeEventEntity, DynamicJAXBContext oxmJaxbContext, String entityType,
351       String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
352      
353     String entityPrimaryKeyFieldName =
354         getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
355     if (entityPrimaryKeyFieldName == null) {
356       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
357           "Payload missing primary key attribute");
358       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
359           "Payload missing primary key attribute", uebPayload);
360       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
361       return null;
362     }
363     String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
364     if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
365       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
366           "Payload missing primary value attribute");
367       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
368           "Payload missing primary value attribute", uebPayload);
369
370       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
371       return null;
372     }
373
374
375     if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
376       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
377           "Payload missing searchable attribute for entity type '" + entityType + "'");
378       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
379           "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
380
381       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
382       return null;
383     }
384     spikeEventEntity.setEntityPrimaryKeyName(entityPrimaryKeyFieldName);
385     spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldName);
386
387     try {
388       spikeEventEntity.deriveFields();
389
390     } catch (NoSuchAlgorithmException e) {
391       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
392       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
393           uebPayload);
394
395       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
396       return null;
397     }
398     return spikeEventEntity;
399   }
400
401   protected void setResponse(Exchange exchange, ResponseType responseType, String additionalInfo) {
402
403     exchange.getOut().setHeader("ResponseType", responseType.toString());
404     exchange.getOut().setBody(additionalInfo);
405   }
406
407
408  protected String getOxmEntityType(String entityType) {
409    return new OxmEntityTypeConverter().convert(entityType);
410  }
411  
412  protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
413      String entityType, String uebPayload,Exchange exchange) {
414    List<String> searchableAttr =
415        getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
416    if (searchableAttr == null) {
417      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
418          "Searchable attribute not found for payload entity type '" + entityType + "'");
419      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
420          "Searchable attribute not found for payload entity type '" + entityType + "'",
421          uebPayload);
422
423      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
424      return null;
425    }
426    return searchableAttr;
427  }
428
429
430   private SpikeEventVertex initializeSpikeEventVertex(String payload) {
431
432     SpikeEventVertex eventVertex = null;
433     ObjectMapper mapper = new ObjectMapper();
434
435     // Make sure that were were actually passed in a valid string.
436     if (payload == null || payload.isEmpty()) {
437       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
438       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
439
440       return eventVertex;
441     }
442
443     // Marshal the supplied string into a UebEventHeader object.
444     try {
445       eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
446     } catch (JsonProcessingException e) {
447       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
448     } catch (Exception e) {
449       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
450     }
451
452     if (eventVertex != null) {
453       logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
454     }
455
456     return eventVertex;
457   }
458
459
460   private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
461       String oxmEntityType, String entityType) {
462
463     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
464     if (entity == null) {
465       return null;
466     }
467
468     List<DatabaseField> list = entity.getDescriptor().getPrimaryKeyFields();
469     if (list != null && !list.isEmpty()) {
470       String keyName = list.get(0).getName();
471       return keyName.substring(0, keyName.indexOf('/'));
472     }
473
474     return "";
475   }
476
477   private String lookupValueUsingKey(String payload, String key) throws JSONException {
478     JsonNode jsonNode = convertToJsonNode(payload);
479     return RouterServiceUtil.recursivelyLookupJsonPayload(jsonNode, key);
480   }
481
482
483   private JsonNode convertToJsonNode(String payload) {
484
485     ObjectMapper mapper = new ObjectMapper();
486     JsonNode jsonNode = null;
487     try {
488       jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
489     } catch (IOException e) {
490       logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
491           payload);
492       logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
493           "");
494     }
495
496     return jsonNode;
497   }
498
499
500   private boolean getSearchTags(SpikeEventEntity spikeEventEntity, List<String> searchableAttr,
501       String payload, String action) {
502
503     boolean hasSearchableAttr = false;
504     for (String searchTagField : searchableAttr) {
505       String searchTagValue;
506       if (searchTagField.equalsIgnoreCase(spikeEventEntity.getEntityPrimaryKeyName())) {
507         searchTagValue = spikeEventEntity.getEntityPrimaryKeyValue();
508       } else {
509         searchTagValue = this.lookupValueUsingKey(payload, searchTagField);
510       }
511
512       if (searchTagValue != null && !searchTagValue.isEmpty()) {
513         hasSearchableAttr = true;
514         spikeEventEntity.addSearchTagWithKey(searchTagValue, searchTagField);
515       }
516     }
517     return hasSearchableAttr;
518   }
519
520   /*
521    * Check if OXM version is available. If available, load it.
522    */
523   private DynamicJAXBContext loadOxmContext(String version) {
524     if (version == null) {
525       logger.error(EntityEventPolicyMsgs.FAILED_TO_FIND_OXM_VERSION, version);
526       return null;
527     }
528
529     return oxmVersionContextMap.get(version);
530   }
531
532   private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
533       String entityType, String fieldName) {
534
535     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
536     if (entity == null) {
537       return null;
538     }
539
540     /*
541      * Check for searchable XML tag
542      */
543     List<String> fieldValues = null;
544     Map<String, String> properties = entity.getDescriptor().getProperties();
545     for (Map.Entry<String, String> entry : properties.entrySet()) {
546       if (entry.getKey().equalsIgnoreCase(fieldName)) {
547         fieldValues = Arrays.asList(entry.getValue().split(","));
548         break;
549       }
550     }
551
552     return fieldValues;
553   }
554
555   protected SpikeEventEntity getPopulatedEntity(JsonNode entityNode,
556       OxmEntityDescriptor resultDescriptor) {
557     SpikeEventEntity d = new SpikeEventEntity();
558
559     d.setEntityType(resultDescriptor.getEntityName());
560
561     List<String> primaryKeyValues = new ArrayList<>();
562     List<String> primaryKeyNames = new ArrayList<>();
563     String pkeyValue;
564
565     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
566       pkeyValue = RouterServiceUtil.getNodeFieldAsText(entityNode, keyName);
567       if (pkeyValue != null) {
568         primaryKeyValues.add(pkeyValue);
569         primaryKeyNames.add(keyName);
570       } else {
571         // logger.warn("getPopulatedDocument(), pKeyValue is null for entityType = " +
572         // resultDescriptor.getEntityName());
573         logger.error(EntityEventPolicyMsgs.PRIMARY_KEY_NULL_FOR_ENTITY_TYPE,
574             resultDescriptor.getEntityName());
575       }
576     }
577
578     final String primaryCompositeKeyValue = RouterServiceUtil.concatArray(primaryKeyValues, "/");
579     d.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
580     final String primaryCompositeKeyName = RouterServiceUtil.concatArray(primaryKeyNames, "/");
581     d.setEntityPrimaryKeyName(primaryCompositeKeyName);
582
583     final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
584
585     /*
586      * Based on configuration, use the configured field names for this entity-Type to build a
587      * multi-value collection of search tags for elastic search entity search criteria.
588      */
589
590
591     for (String searchTagField : searchTagFields) {
592       String searchTagValue = RouterServiceUtil.getNodeFieldAsText(entityNode, searchTagField);
593       if (searchTagValue != null && !searchTagValue.isEmpty()) {
594         d.addSearchTagWithKey(searchTagValue, searchTagField);
595       }
596     }
597
598     return d;
599   }
600
601
602
603   // put this here until we find a better spot
604   /**
605    * Helper utility to concatenate substrings of a URI together to form a proper URI.
606    * 
607    * @param suburis the list of substrings to concatenate together
608    * @return the concatenated list of substrings
609    */
610   private static String concatSubUri(String... suburis) {
611     String finalUri = "";
612
613     for (String suburi : suburis) {
614
615       if (suburi != null) {
616         // Remove any leading / since we only want to append /
617         suburi = suburi.replaceFirst("^/*", "");
618
619         // Add a trailing / if one isn't already there
620         finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
621       }
622     }
623
624     return finalUri;
625   }
626
627
628
629   /**
630    * Perform create, read, update or delete (CRUD) operation on search engine's suggestive search
631    * index
632    * 
633    * @param eventEntity Entity/data to use in operation
634    * @param action The operation to perform
635    */
636   protected void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
637       String index) {
638     try {
639
640       Map<String, List<String>> headers = new HashMap<>();
641       headers.put(Headers.FROM_APP_ID, Arrays.asList("DataLayer"));
642       headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
643
644       String entityId = eventEntity.getId();
645
646       if ((action.equalsIgnoreCase(CREATE) && entityId != null)
647           || action.equalsIgnoreCase(UPDATE)) {
648
649         // Run the GET to retrieve the ETAG from the search service
650         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
651
652         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
653           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
654
655           if (etag != null && !etag.isEmpty()) {
656             headers.put(Headers.IF_MATCH, etag);
657           } else {
658             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
659           }
660         }
661
662         // Write the entity to the search service.
663         // PUT
664         searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
665       } else if (action.equalsIgnoreCase(CREATE)) {
666         // Write the entry to the search service.
667         searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
668
669       } else if (action.equalsIgnoreCase(DELETE)) {
670         // Run the GET to retrieve the ETAG from the search service
671         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
672
673         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
674           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
675
676           if (etag != null && !etag.isEmpty()) {
677             headers.put(Headers.IF_MATCH, etag);
678           } else {
679             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
680           }
681
682           /*
683            * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
684            * we specify a Content-Type.
685            */
686
687           headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
688
689           searchAgent.deleteDocument(index, eventEntity.getId(), headers);
690         } else {
691           logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
692         }
693       } else {
694         logger.error(EntityEventPolicyMsgs.ENTITY_OPERATION_NOT_SUPPORTED, action);
695       }
696     } catch (IOException e) {
697       logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, eventEntity.getId(),
698           action);
699     }
700   }
701   
702   protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
703
704     SpikeEventMeta meta = new SpikeEventMeta();
705     Object eventPayloadObj = null;
706     String eventPayload = null;
707     try {
708        eventPayloadObj = exchange.getIn().getBody();    
709       
710       /*
711        * It is expected that mainJson will have multiple top level objects: - header - body - result
712        */
713       if (eventPayloadObj == null) {
714         returnWithError(exchange, null, "Invalid Payload");
715         return null;
716       }
717       
718       eventPayload = (String)eventPayloadObj;
719        
720       meta.setEventEntity(new JSONObject(eventPayload));
721     } catch (JSONException exc) {
722       returnWithError(exchange, eventPayload, "Invalid Payload");
723       return null;
724     }
725
726     JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
727
728     if (eventHeader == null) {
729       returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
730       return null;
731     }
732
733     meta.setEventHeader(eventHeader);
734
735     /*
736      * Only process SPIKE update-notification events
737      */
738
739     final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
740     final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
741
742     if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
743       // drop event
744       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
745           + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
746       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
747           "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
748
749       /*
750        * I don't think ignoring a non-applicable event constitutes a failure.
751        */
752
753       setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
754       return null;
755     }
756
757     JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
758
759     if (eventBody == null) {
760       returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
761       return null;
762     }
763
764     meta.setEventBody(eventBody);
765
766     String action = eventBody.getString(OPERATION_KEY);
767     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
768       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
769           "Unrecognized action '" + action + "'", eventPayload);
770       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
771           "Unrecognized action '" + action + "'");
772       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
773       return null;
774     }
775     meta.setBodyOperationType(action);
776
777     // Load the event body data, any errors will result in a failure and discard
778
779     JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
780     if (spikeVertex == null) {
781       returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
782       return null;
783     }
784
785     meta.setSpikeVertex(spikeVertex);
786
787     SpikeEventVertex spikeEventVertex = null;
788     try {
789       spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
790     } catch (JSONException exc) {
791       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
792           "Error initializating spike event.  Error: " + exc.getMessage(), eventPayload);
793       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
794           "Error initializating spike event.  Error: " + exc.getMessage());
795
796       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
797       return null;
798     }
799
800     meta.setSpikeEventVertex(spikeEventVertex);
801
802     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
803     if (oxmJaxbContext == null) {
804       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
805       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
806           eventPayload);
807
808       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
809       return null;
810     }
811
812     meta.setOxmJaxbContext(oxmJaxbContext);
813
814     String entityType = spikeEventVertex.getType();
815     if (entityType == null || entityType.isEmpty()) {
816       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
817           "Payload header missing entity type", eventPayload);
818       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
819           "Payload header missing entity type");
820
821       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
822       return null;
823     }
824     
825     /*
826      * test if entityType is in the model
827      */
828
829     VersionedOxmEntities oxmEntities =
830     EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
831     
832     if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
833       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
834           eventPayload);
835       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
836           "No matching OXM Descriptor for entity-type='" + entityType + "'");
837
838       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
839       return null;
840     }
841
842     
843     String entityKey = spikeEventVertex.getKey();
844     if (entityKey == null || entityKey.isEmpty()) {
845       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
846           eventPayload);
847       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
848           "Payload vertex missing entity key");
849
850       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
851       return null;
852     }
853     String entityLink = spikeEventVertex.getEntityLink();
854     if (entityLink == null || entityLink.isEmpty()) {
855       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
856           "Payload header missing entity link", eventPayload);
857       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
858           "Payload header missing entity link");
859
860       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
861       return null;
862     }
863
864     JSONObject vertexProperties = null;
865     try {
866
867       vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
868
869     } catch (JSONException exc) {
870       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
871           "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
872       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
873           "Payload header missing " + VERTEX_PROPERTIES_KEY);
874
875       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
876       return null;
877     }
878
879     meta.setVertexProperties(vertexProperties);
880
881     // log the fact that all data are in good shape
882     logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
883     logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
884         eventPayload);
885
886     return meta;
887
888   }
889   
890   protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
891
892     /*
893      * These are all critical keys
894      */
895
896     final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
897     final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
898     final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
899
900     SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
901     eventVertex.setSchemaVersion(vertexSchemaVersion);
902     logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
903
904     return eventVertex;
905
906   }
907   
908 }