Fix aggregate-vnf search index population
[aai/data-router.git] / src / main / java / org / onap / aai / datarouter / policy / AbstractSpikeEntityEventProcessor.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.datarouter.policy;
22
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.security.NoSuchAlgorithmException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34
35 import org.apache.camel.Exchange;
36 import org.apache.camel.Processor;
37 import org.eclipse.persistence.dynamic.DynamicType;
38 import org.eclipse.persistence.internal.helper.DatabaseField;
39 import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
40 import org.eclipse.persistence.oxm.MediaType;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45 import org.onap.aai.cl.mdc.MdcContext;
46 import org.onap.aai.schema.OxmModelLoader;
47 import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
48 import org.onap.aai.datarouter.entity.SpikeEventEntity;
49 import org.onap.aai.datarouter.entity.SpikeEventMeta;
50 import org.onap.aai.datarouter.entity.SpikeEventVertex;
51 import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
52 import org.onap.aai.datarouter.util.RouterServiceUtil;
53 import org.onap.aai.datarouter.util.SearchServiceAgent;
54 import org.onap.aai.entity.OxmEntityDescriptor;
55 import org.onap.aai.util.EntityOxmReferenceHelper;
56 import org.onap.aai.util.ExternalOxmModelProcessor;
57 import org.onap.aai.util.Version;
58 import org.onap.aai.util.VersionedOxmEntities;
59 import org.onap.aai.restclient.client.Headers;
60 import org.onap.aai.restclient.client.OperationResult;
61 import org.onap.aai.restclient.rest.HttpUtil;
62 import org.slf4j.MDC;
63
64 import com.fasterxml.jackson.core.JsonProcessingException;
65 import com.fasterxml.jackson.databind.JsonNode;
66 import com.fasterxml.jackson.databind.ObjectMapper;
67
68 public abstract class AbstractSpikeEntityEventProcessor implements Processor {
69
70   protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
71   private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
72
73   protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
74
75   protected static final String UPDATE_NOTIFICATION = "update-notification";
76   protected static final String SPIKE = "SPIKE";
77   
78   protected static final String HEADER_KEY = "header";
79   protected static final String EVENT_TYPE_KEY = "event-type";
80   protected static final String SOURCE_NAME_KEY = "source-name";
81   protected static final String BODY_KEY = "body";
82   protected static final String OPERATION_KEY = "operation";
83   
84   protected static final String VERTEX_KEY = "vertex";
85   protected static final String VERTEX_PROPERTIES_KEY = "properties";
86   protected static final String VERTEX_KEY_KEY = "key";
87   protected static final String VERTEX_TYPE_KEY = "type";
88   protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
89   
90   protected static final String CREATE = "create";
91   protected static final String DELETE = "delete";
92   protected static final String UPDATE = "update";
93   
94   protected enum ResponseType {
95     SUCCESS, PARTIAL_SUCCESS, FAILURE;
96   }
97
98   protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
99
100   protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
101   protected String oxmVersion = null;
102
103   /** Agent for communicating with the Search Service. */
104   protected SearchServiceAgent searchAgent = null;
105   protected String searchIndexName;
106   protected String searchIndexSchema;
107   protected String createIndexUrl;
108
109   protected Logger logger;
110   protected Logger metricsLogger;
111   protected ObjectMapper mapper;
112
113
114   public AbstractSpikeEntityEventProcessor(SpikeEventPolicyConfig config)
115       throws FileNotFoundException {
116     mapper = new ObjectMapper();
117     LoggerFactory loggerFactoryInstance = LoggerFactory.getInstance();
118     logger = loggerFactoryInstance.getLogger(AbstractSpikeEntityEventProcessor.class.getName());
119     metricsLogger =
120         loggerFactoryInstance.getMetricsLogger(AbstractSpikeEntityEventProcessor.class.getName());
121
122     // Instantiate the agent that we will use for interacting with the Search Service.
123     searchAgent = new SearchServiceAgent(config.getSearchCertName(), config.getSearchKeystore(),
124         config.getSearchKeystorePwd(), AbstractSpikeEntityEventProcessor
125             .concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint()),
126         config.getSearchEndpointDocuments(), logger);
127
128     this.externalOxmModelProcessors = new ArrayList<>();
129     this.externalOxmModelProcessors.add(EntityOxmReferenceHelper.getInstance());
130     OxmModelLoader.registerExternalOxmModelProcessors(externalOxmModelProcessors);
131     OxmModelLoader.loadModels();
132     oxmVersionContextMap = OxmModelLoader.getVersionContextMap();
133     parseLatestOxmVersion();
134   }
135
136   public String getCreateIndexUrl() {
137     return createIndexUrl;
138   }
139
140
141   public void setCreateIndexUrl(String createIndexUrl) {
142     this.createIndexUrl = createIndexUrl;
143   }
144   
145   public String getSearchIndexName() {
146     return searchIndexName;
147   }
148
149
150   public void setSearchIndexName(String searchIndexName) {
151     this.searchIndexName = searchIndexName;
152   }
153
154   public String getSearchIndexSchema() {
155     return searchIndexSchema;
156   }
157
158
159   public void setSearchIndexSchema(String searchIndexSchema) {
160     this.searchIndexSchema = searchIndexSchema;
161   }
162
163   protected void startup() {
164
165   }
166
167   /*
168    * Load the UEB JSON payload, any errors would result to a failure case response.
169    */
170   protected JSONObject getUebContentAsJson(String payload, String contentKey) {
171
172     JSONObject uebJsonObj;
173     JSONObject uebObjContent;
174
175     try {
176       uebJsonObj = new JSONObject(payload);
177     } catch (JSONException e) {
178       logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
179       logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
180       return null;
181     }
182
183     if (uebJsonObj.has(contentKey)) {
184       uebObjContent = uebJsonObj.getJSONObject(contentKey);
185     } else {
186       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
187       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
188       return null;
189     }
190
191     return uebObjContent;
192   }
193   public abstract void process(Exchange exchange) throws Exception;
194
195
196   private void parseLatestOxmVersion() {
197     int latestVersion = -1;
198     if (oxmVersionContextMap != null) {
199       Iterator it = oxmVersionContextMap.entrySet().iterator();
200       while (it.hasNext()) {
201         Map.Entry pair = (Map.Entry) it.next();
202
203         String version = pair.getKey().toString();
204         int versionNum = Integer.parseInt(version.substring(1, version.length()));
205
206         if (versionNum > latestVersion) {
207           latestVersion = versionNum;
208           oxmVersion = pair.getKey().toString();
209         }
210
211         logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
212       }
213     } else {
214       logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
215     }
216   }
217
218
219
220   /**
221    * This will be used in: updateSearchEntityWithCrossEntityReference not this scope Convert object
222    * to json.
223    *
224    * @param object the object
225    * @param pretty the pretty
226    * @return the string
227    * @throws JsonProcessingException the json processing exception
228    * 
229    *         protected static String convertObjectToJson(Object object, boolean pretty) throws
230    *         JsonProcessingException { ObjectWriter ow;
231    * 
232    *         if (pretty) { ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
233    * 
234    *         } else { ow = new ObjectMapper().writer(); }
235    * 
236    *         return ow.writeValueAsString(object); }
237    */
238
239   protected void returnWithError(Exchange exchange, String payload, String errorMsg) {
240     logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, errorMsg);
241     logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, errorMsg, payload);
242     setResponse(exchange, ResponseType.FAILURE, additionalInfo);
243   }
244
245   private boolean isJSONValid(String test) {
246     try {
247       new JSONObject(test);
248     } catch (JSONException ex) {
249       return false;
250     }
251     return true;
252   }
253
254
255
256   protected String getSpikeEventAction(Exchange exchange, String uebPayload) {
257     JSONObject mainJson = new JSONObject(uebPayload);
258     String action = mainJson.getString(OPERATION_KEY);
259     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
260       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
261           "Unrecognized action '" + action + "'", uebPayload);
262       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
263           "Unrecognized action '" + action + "'");
264       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
265       return null;
266     }
267     return action;
268   }
269
270   protected String getExchangeBody(Exchange exchange) {
271     String uebPayload = exchange.getIn().getBody().toString();
272     if (uebPayload == null || !isJSONValid(uebPayload)) {
273       uebPayload = exchange.getIn().getBody(String.class);
274       if (uebPayload == null || !isJSONValid(uebPayload)) {
275         returnWithError(exchange, uebPayload, "Invalid Payload");
276         return null;
277       }
278     }
279     return uebPayload;
280   }
281
282   protected SpikeEventVertex populateEventVertex(Exchange exchange, String uebPayload)
283       throws Exception {
284
285     // Load the UEB payload data, any errors will result in a failure and discard
286
287     JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
288     if (spikeObjVertex == null) {
289       returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
290       return null;
291     }
292
293     SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
294     return eventVertex;
295   }
296
297   protected DynamicJAXBContext readOxm(Exchange exchange, String uebPayload) {
298     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
299     if (oxmJaxbContext == null) {
300       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
301       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
302
303       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
304       return null;
305     }
306     return oxmJaxbContext;
307   }
308
309
310   protected String getEntityType(Exchange exchange, SpikeEventVertex eventVertex,
311       String uebPayload) {
312
313     String entityType = eventVertex.getType();
314     if (entityType == null || entityType.isEmpty()) {
315       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
316           "Payload header missing entity type", uebPayload);
317       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
318           "Payload header missing entity type");
319
320       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
321       return null;
322     }
323     return entityType;
324   }
325
326
327
328   protected String getEntityLink(Exchange exchange, SpikeEventVertex eventVertex,
329       String uebPayload) {
330     String entityKey = eventVertex.getKey();
331     if (entityKey == null || entityKey.isEmpty()) {
332       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
333           uebPayload);
334       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
335           "Payload vertex missing entity key");
336
337       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
338       return null;
339     }
340     //EntityLink never can be null if  entityKey is not null. no need to check
341     return eventVertex.getEntityLink();
342
343   }
344
345
346
347   /*
348    * Use the OXM Model to determine the primary key field name based on the entity-type
349    */
350   protected SpikeEventEntity populateSpikeEventEntity(Exchange exchange,
351       SpikeEventEntity spikeEventEntity, DynamicJAXBContext oxmJaxbContext, String entityType,
352       String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
353      
354     String entityPrimaryKeyFieldName =
355         getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
356     if (entityPrimaryKeyFieldName == null) {
357       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
358           "Payload missing primary key attribute");
359       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
360           "Payload missing primary key attribute", uebPayload);
361       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
362       return null;
363     }
364     String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
365     if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
366       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
367           "Payload missing primary value attribute");
368       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
369           "Payload missing primary value attribute", uebPayload);
370
371       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
372       return null;
373     }
374
375
376     if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
377       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
378           "Payload missing searchable attribute for entity type '" + entityType + "'");
379       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
380           "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
381
382       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
383       return null;
384     }
385     spikeEventEntity.setEntityPrimaryKeyName(entityPrimaryKeyFieldName);
386     spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldName);
387
388     try {
389       spikeEventEntity.deriveFields();
390
391     } catch (NoSuchAlgorithmException e) {
392       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
393       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
394           uebPayload);
395
396       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
397       return null;
398     }
399     return spikeEventEntity;
400   }
401
402   protected void setResponse(Exchange exchange, ResponseType responseType, String additionalInfo) {
403
404     exchange.getOut().setHeader("ResponseType", responseType.toString());
405     exchange.getOut().setBody(additionalInfo);
406   }
407
408
409  protected String getOxmEntityType(String entityType) {
410    return new OxmEntityTypeConverter().convert(entityType);
411  }
412  
413  protected List<String> getSearchableAttibutes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
414      String entityType, String uebPayload,Exchange exchange) {
415    List<String> searchableAttr =
416        getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
417    if (searchableAttr == null) {
418      logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
419          "Searchable attribute not found for payload entity type '" + entityType + "'");
420      logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
421          "Searchable attribute not found for payload entity type '" + entityType + "'",
422          uebPayload);
423
424      setResponse(exchange, ResponseType.FAILURE, additionalInfo);
425      return null;
426    }
427    return searchableAttr;
428  }
429
430
431   private SpikeEventVertex initializeSpikeEventVertex(String payload) {
432
433     SpikeEventVertex eventVertex = null;
434     ObjectMapper mapper = new ObjectMapper();
435
436     // Make sure that were were actually passed in a valid string.
437     if (payload == null || payload.isEmpty()) {
438       logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
439       logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
440
441       return eventVertex;
442     }
443
444     // Marshal the supplied string into a UebEventHeader object.
445     try {
446       eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
447     } catch (JsonProcessingException e) {
448       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
449     } catch (Exception e) {
450       logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
451     }
452
453     if (eventVertex != null) {
454       logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
455     }
456
457     return eventVertex;
458   }
459
460
461   private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
462       String oxmEntityType, String entityType) {
463
464     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
465     if (entity == null) {
466       return null;
467     }
468
469     List<DatabaseField> list = entity.getDescriptor().getPrimaryKeyFields();
470     if (list != null && !list.isEmpty()) {
471       String keyName = list.get(0).getName();
472       return keyName.substring(0, keyName.indexOf('/'));
473     }
474
475     return "";
476   }
477
478   private String lookupValueUsingKey(String payload, String key) throws JSONException {
479     JsonNode jsonNode = convertToJsonNode(payload);
480     return RouterServiceUtil.recursivelyLookupJsonPayload(jsonNode, key);
481   }
482
483
484   private JsonNode convertToJsonNode(String payload) {
485
486     ObjectMapper mapper = new ObjectMapper();
487     JsonNode jsonNode = null;
488     try {
489       jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
490     } catch (IOException e) {
491       logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
492           payload);
493       logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
494           "");
495     }
496
497     return jsonNode;
498   }
499
500
501   private boolean getSearchTags(SpikeEventEntity spikeEventEntity, List<String> searchableAttr,
502       String payload, String action) {
503
504     boolean hasSearchableAttr = false;
505     for (String searchTagField : searchableAttr) {
506       String searchTagValue;
507       if (searchTagField.equalsIgnoreCase(spikeEventEntity.getEntityPrimaryKeyName())) {
508         searchTagValue = spikeEventEntity.getEntityPrimaryKeyValue();
509       } else {
510         searchTagValue = this.lookupValueUsingKey(payload, searchTagField);
511       }
512
513       if (searchTagValue != null && !searchTagValue.isEmpty()) {
514         hasSearchableAttr = true;
515         spikeEventEntity.addSearchTagWithKey(searchTagValue, searchTagField);
516       }
517     }
518     return hasSearchableAttr;
519   }
520
521   /*
522    * Check if OXM version is available. If available, load it.
523    */
524   private DynamicJAXBContext loadOxmContext(String version) {
525     if (version == null) {
526       logger.error(EntityEventPolicyMsgs.FAILED_TO_FIND_OXM_VERSION, version);
527       return null;
528     }
529
530     return oxmVersionContextMap.get(version);
531   }
532
533   private List<String> getOxmAttributes(DynamicJAXBContext oxmJaxbContext, String oxmEntityType,
534       String entityType, String fieldName) {
535
536     DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
537     if (entity == null) {
538       return null;
539     }
540
541     /*
542      * Check for searchable XML tag
543      */
544     List<String> fieldValues = null;
545     Map<String, String> properties = entity.getDescriptor().getProperties();
546     for (Map.Entry<String, String> entry : properties.entrySet()) {
547       if (entry.getKey().equalsIgnoreCase(fieldName)) {
548         fieldValues = Arrays.asList(entry.getValue().split(","));
549         break;
550       }
551     }
552
553     return fieldValues;
554   }
555
556   protected SpikeEventEntity getPopulatedEntity(JsonNode entityNode,
557       OxmEntityDescriptor resultDescriptor) {
558     SpikeEventEntity d = new SpikeEventEntity();
559
560     d.setEntityType(resultDescriptor.getEntityName());
561
562     List<String> primaryKeyValues = new ArrayList<>();
563     List<String> primaryKeyNames = new ArrayList<>();
564     String pkeyValue;
565
566     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
567       pkeyValue = RouterServiceUtil.getNodeFieldAsText(entityNode, keyName);
568       if (pkeyValue != null) {
569         primaryKeyValues.add(pkeyValue);
570         primaryKeyNames.add(keyName);
571       } else {
572         // logger.warn("getPopulatedDocument(), pKeyValue is null for entityType = " +
573         // resultDescriptor.getEntityName());
574         logger.error(EntityEventPolicyMsgs.PRIMARY_KEY_NULL_FOR_ENTITY_TYPE,
575             resultDescriptor.getEntityName());
576       }
577     }
578
579     final String primaryCompositeKeyValue = RouterServiceUtil.concatArray(primaryKeyValues, "/");
580     d.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
581     final String primaryCompositeKeyName = RouterServiceUtil.concatArray(primaryKeyNames, "/");
582     d.setEntityPrimaryKeyName(primaryCompositeKeyName);
583
584     final List<String> searchTagFields = resultDescriptor.getSearchableAttributes();
585
586     /*
587      * Based on configuration, use the configured field names for this entity-Type to build a
588      * multi-value collection of search tags for elastic search entity search criteria.
589      */
590
591
592     for (String searchTagField : searchTagFields) {
593       String searchTagValue = RouterServiceUtil.getNodeFieldAsText(entityNode, searchTagField);
594       if (searchTagValue != null && !searchTagValue.isEmpty()) {
595         d.addSearchTagWithKey(searchTagValue, searchTagField);
596       }
597     }
598
599     return d;
600   }
601
602
603
604   // put this here until we find a better spot
605   /**
606    * Helper utility to concatenate substrings of a URI together to form a proper URI.
607    * 
608    * @param suburis the list of substrings to concatenate together
609    * @return the concatenated list of substrings
610    */
611   private static String concatSubUri(String... suburis) {
612     String finalUri = "";
613
614     for (String suburi : suburis) {
615
616       if (suburi != null) {
617         // Remove any leading / since we only want to append /
618         suburi = suburi.replaceFirst("^/*", "");
619
620         // Add a trailing / if one isn't already there
621         finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
622       }
623     }
624
625     return finalUri;
626   }
627
628
629
630   /**
631    * Perform create, read, update or delete (CRUD) operation on search engine's suggestive search
632    * index
633    * 
634    * @param eventEntity Entity/data to use in operation
635    * @param action The operation to perform
636    */
637   protected void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
638       String index) {
639     try {
640
641       Map<String, List<String>> headers = new HashMap<>();
642       headers.put(Headers.FROM_APP_ID, Arrays.asList("DataLayer"));
643       headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
644
645       String entityId = eventEntity.getId();
646
647       if ((action.equalsIgnoreCase(CREATE) && entityId != null)
648           || action.equalsIgnoreCase(UPDATE)) {
649
650         // Run the GET to retrieve the ETAG from the search service
651         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
652
653         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
654           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
655
656           if (etag != null && !etag.isEmpty()) {
657             headers.put(Headers.IF_MATCH, etag);
658           } else {
659             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
660           }
661         }
662
663         // Write the entity to the search service.
664         // PUT
665         searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
666       } else if (action.equalsIgnoreCase(CREATE)) {
667         // Write the entry to the search service.
668         searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
669
670       } else if (action.equalsIgnoreCase(DELETE)) {
671         // Run the GET to retrieve the ETAG from the search service
672         OperationResult storedEntity = searchAgent.getDocument(index, entityId);
673
674         if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
675           List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
676
677           if (etag != null && !etag.isEmpty()) {
678             headers.put(Headers.IF_MATCH, etag);
679           } else {
680             logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
681           }
682
683           /*
684            * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
685            * we specify a Content-Type.
686            */
687
688           headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
689
690           searchAgent.deleteDocument(index, eventEntity.getId(), headers);
691         } else {
692           logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
693         }
694       } else {
695         logger.error(EntityEventPolicyMsgs.ENTITY_OPERATION_NOT_SUPPORTED, action);
696       }
697     } catch (IOException e) {
698       logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, eventEntity.getId(),
699           action);
700     }
701   }
702   
703   protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
704
705     SpikeEventMeta meta = new SpikeEventMeta();
706     Object eventPayloadObj = null;
707     String eventPayload = null;
708     try {
709        eventPayloadObj = exchange.getIn().getBody();    
710       
711       /*
712        * It is expected that mainJson will have multiple top level objects: - header - body - result
713        */
714       if (eventPayloadObj == null) {
715         returnWithError(exchange, null, "Invalid Payload");
716         return null;
717       }
718       
719       eventPayload = (String)eventPayloadObj;
720        
721       meta.setEventEntity(new JSONObject(eventPayload));
722     } catch (JSONException exc) {
723       returnWithError(exchange, eventPayload, "Invalid Payload");
724       return null;
725     }
726
727     JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
728
729     if (eventHeader == null) {
730       returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
731       return null;
732     }
733
734     meta.setEventHeader(eventHeader);
735
736     /*
737      * Only process SPIKE update-notification events
738      */
739
740     final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
741     final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
742
743     if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
744       // drop event
745       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
746           + sourceName + "' and eventType='" + eventType + "'.  Payload=" + eventPayload);
747       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
748           "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
749
750       /*
751        * I don't think ignoring a non-applicable event constitutes a failure.
752        */
753
754       setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
755       return null;
756     }
757
758     JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
759
760     if (eventBody == null) {
761       returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
762       return null;
763     }
764
765     meta.setEventBody(eventBody);
766
767     String action = eventBody.getString(OPERATION_KEY);
768     if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
769       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
770           "Unrecognized action '" + action + "'", eventPayload);
771       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
772           "Unrecognized action '" + action + "'");
773       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
774       return null;
775     }
776     meta.setBodyOperationType(action);
777
778     // Load the event body data, any errors will result in a failure and discard
779
780     JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
781     if (spikeVertex == null) {
782       returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
783       return null;
784     }
785
786     meta.setSpikeVertex(spikeVertex);
787
788     SpikeEventVertex spikeEventVertex = null;
789     try {
790       spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
791     } catch (JSONException exc) {
792       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
793           "Error initializating spike event.  Error: " + exc.getMessage(), eventPayload);
794       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
795           "Error initializating spike event.  Error: " + exc.getMessage());
796
797       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
798       return null;
799     }
800
801     meta.setSpikeEventVertex(spikeEventVertex);
802
803     DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion);
804     if (oxmJaxbContext == null) {
805       logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
806       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
807           eventPayload);
808
809       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
810       return null;
811     }
812
813     meta.setOxmJaxbContext(oxmJaxbContext);
814
815     String entityType = spikeEventVertex.getType();
816     if (entityType == null || entityType.isEmpty()) {
817       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
818           "Payload header missing entity type", eventPayload);
819       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
820           "Payload header missing entity type");
821
822       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
823       return null;
824     }
825     
826     /*
827      * test if entityType is in the model
828      */
829
830     VersionedOxmEntities oxmEntities =
831     EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion.toLowerCase()));
832     
833     if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
834       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
835           eventPayload);
836       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
837           "No matching OXM Descriptor for entity-type='" + entityType + "'");
838
839       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
840       return null;
841     }
842
843     
844     String entityKey = spikeEventVertex.getKey();
845     if (entityKey == null || entityKey.isEmpty()) {
846       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
847           eventPayload);
848       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
849           "Payload vertex missing entity key");
850
851       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
852       return null;
853     }
854     String entityLink = spikeEventVertex.getEntityLink();
855     if (entityLink == null || entityLink.isEmpty()) {
856       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
857           "Payload header missing entity link", eventPayload);
858       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
859           "Payload header missing entity link");
860
861       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
862       return null;
863     }
864
865     JSONObject vertexProperties = null;
866     try {
867
868       vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
869
870     } catch (JSONException exc) {
871       logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
872           "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
873       logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
874           "Payload header missing " + VERTEX_PROPERTIES_KEY);
875
876       setResponse(exchange, ResponseType.FAILURE, additionalInfo);
877       return null;
878     }
879
880     meta.setVertexProperties(vertexProperties);
881
882     // log the fact that all data are in good shape
883     logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
884     logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
885         eventPayload);
886
887     return meta;
888
889   }
890   
891   protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
892
893     /*
894      * These are all critical keys
895      */
896
897     final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
898     final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
899     final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
900
901     SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
902     eventVertex.setSchemaVersion(vertexSchemaVersion);
903     logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
904
905     return eventVertex;
906
907   }
908   
909   protected List<String> extractSuggestableAttr(VersionedOxmEntities oxmEntities, String entityType) {
910     // Extract suggestable attributeshandleTopographicalData
911     Map<String, OxmEntityDescriptor> rootDescriptor = oxmEntities.getSuggestableEntityDescriptors();
912
913     if (rootDescriptor == null) {
914       return Collections.emptyList();
915     }
916
917     OxmEntityDescriptor desc = rootDescriptor.get(entityType);
918
919     if (desc == null) {
920       return Collections.emptyList();
921     }
922
923     return desc.getSuggestableAttributes();
924   }
925   
926 }