update sync queries to use searh data service
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / crossentityreference / sync / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.crossentityreference.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.ExecutorService;
35 import java.util.function.Consumer;
36 import java.util.function.Supplier;
37
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.aai.cl.mdc.MdcContext;
41 import org.onap.aai.restclient.client.OperationResult;
42 import org.onap.aai.sparky.config.oxm.CrossEntityReference;
43 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceDescriptor;
44 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
48 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
49 import org.onap.aai.sparky.dal.NetworkTransaction;
50 import org.onap.aai.sparky.dal.rest.HttpMethod;
51 import org.onap.aai.sparky.logging.AaiUiMsgs;
52 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
53 import org.onap.aai.sparky.sync.IndexSynchronizer;
54 import org.onap.aai.sparky.sync.SynchronizerConstants;
55 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
56 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
57 import org.onap.aai.sparky.sync.entity.IndexableCrossEntityReference;
58 import org.onap.aai.sparky.sync.entity.MergableEntity;
59 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
60 import org.onap.aai.sparky.sync.enumeration.OperationState;
61 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
62 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
63 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
64 import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
65 import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
66 import org.onap.aai.sparky.util.NodeUtils;
67 import org.slf4j.MDC;
68
69 import com.fasterxml.jackson.core.JsonProcessingException;
70 import com.fasterxml.jackson.databind.JsonNode;
71 import com.fasterxml.jackson.databind.ObjectReader;
72 import com.fasterxml.jackson.databind.node.ArrayNode;
73
74 /**
75  * The Class CrossEntityReferenceSynchronizer.
76  */
77 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
78     implements IndexSynchronizer {
79
80   /**
81    * The Class RetryCrossEntitySyncContainer.
82    */
83   private class RetryCrossEntitySyncContainer {
84     NetworkTransaction txn;
85     IndexableCrossEntityReference icer;
86
87     /**
88      * Instantiates a new retry cross entity sync container.
89      *
90      * @param txn the txn
91      * @param icer the icer
92      */
93     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
94         IndexableCrossEntityReference icer) {
95       this.txn = txn;
96       this.icer = icer;
97     }
98
99     public NetworkTransaction getNetworkTransaction() {
100       return txn;
101     }
102
103     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
104       return icer;
105     }
106   }
107
108   private static final Logger LOG =
109       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
110
111   private static final String SERVICE_INSTANCE = "service-instance";
112
113   private Deque<SelfLinkDescriptor> selflinks;
114   private Deque<RetryCrossEntitySyncContainer> retryQueue;
115   private Map<String, Integer> retryLimitTracker;
116   private boolean isAllWorkEnumerated;
117   protected ExecutorService esPutExecutor;
118   private CrossEntityReferenceLookup crossEntityReferenceLookup;
119   private OxmEntityLookup oxmEntityLookup;
120   private SearchableEntityLookup searchableEntityLookup;
121   
122
123   /**
124    * Instantiates a new cross entity reference synchronizer.
125    *
126    * @throws Exception the exception
127    */
128   public CrossEntityReferenceSynchronizer(ElasticSearchSchemaConfig schemaConfig,
129       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
130       NetworkStatisticsConfig esStatConfig, CrossEntityReferenceLookup crossEntityReferenceLookup,
131       OxmEntityLookup oxmEntityLookup, SearchableEntityLookup searchableEntityLookup) throws Exception {
132     super(LOG, "CERS", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
133         aaiStatConfig, esStatConfig);
134     this.crossEntityReferenceLookup = crossEntityReferenceLookup;
135     this.oxmEntityLookup = oxmEntityLookup;
136     this.searchableEntityLookup = searchableEntityLookup;
137     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
138     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
139     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
140     this.synchronizerName = "Cross Reference Entity Synchronizer";
141     this.isAllWorkEnumerated = false;
142     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
143     this.aaiEntityStats.intializeEntityCounters(
144         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().keySet());
145
146     this.esEntityStats.intializeEntityCounters(
147         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().keySet());
148     this.syncDurationInMs = -1;
149   }
150
151   /* (non-Javadoc)
152    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
153    */
154   @Override
155   public OperationState doSync() {
156     this.syncDurationInMs = -1;
157         String txnID = NodeUtils.getRandomTxnId();
158     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
159         
160     resetCounters();
161     syncStartedTimeStampInMs = System.currentTimeMillis();
162     launchSyncFlow();
163     return OperationState.OK;
164   }
165
166   @Override
167   public SynchronizerState getState() {
168     if (!isSyncDone()) {
169       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
170     }
171
172     return SynchronizerState.IDLE;
173   }
174
175   /* (non-Javadoc)
176    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
177    */
178   @Override
179   public String getStatReport(boolean showFinalReport) {
180     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
181     return getStatReport(syncDurationInMs, showFinalReport);
182   }
183
184   /* (non-Javadoc)
185    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
186    */
187   @Override
188   public void shutdown() {
189     this.shutdownExecutors();
190   }
191
192   @Override
193   protected boolean isSyncDone() {
194     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
195
196     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
197       return false;
198     }
199
200     return true;
201   }
202
203   /**
204    * Launch sync flow.
205    *
206    * @return the operation state
207    */
208   private OperationState launchSyncFlow() {
209         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
210     Map<String, CrossEntityReferenceDescriptor> descriptorMap =
211         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors();
212
213     if (descriptorMap.isEmpty()) {
214       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
215
216       return OperationState.ERROR;
217     }
218
219     Collection<String> syncTypes = descriptorMap.keySet();
220
221     try {
222
223       /*
224        * launch a parallel async thread to process the documents for each entity-type (to max the of
225        * the configured executor anyway)
226        */
227
228       aaiWorkOnHand.set(syncTypes.size());
229
230       for (String key : syncTypes) {
231
232         supplyAsync(new Supplier<Void>() {
233
234           @Override
235           public Void get() {
236                 MDC.setContextMap(contextMap);
237             OperationResult typeLinksResult = null;
238             try {
239               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
240               aaiWorkOnHand.decrementAndGet();
241               processEntityTypeSelfLinks(typeLinksResult);
242             } catch (Exception exc) {
243               LOG.error(AaiUiMsgs.ERROR_GENERIC,
244                   "An error occurred processing entity selflinks. Error: " + exc.getMessage());
245             }
246
247             return null;
248           }
249
250         }, aaiExecutor).whenComplete((result, error) -> {
251           if (error != null) {
252             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
253           }
254         });
255       }
256
257       while (aaiWorkOnHand.get() != 0) {
258
259         if (LOG.isDebugEnabled()) {
260           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
261         }
262
263         Thread.sleep(1000);
264       }
265
266       aaiWorkOnHand.set(selflinks.size());
267       isAllWorkEnumerated = true;
268       performSync();
269
270       while (!isSyncDone()) {
271         performRetrySync();
272         Thread.sleep(1000);
273       }
274
275       /*
276        * Make sure we don't hang on to retries that failed which could cause issues during future
277        * syncs
278        */
279       retryLimitTracker.clear();
280
281     } catch (Exception exc) {
282       LOG.error(AaiUiMsgs.ERROR_GENERIC,
283           "An error occurred during entity synchronization. Error: " + exc.getMessage());
284
285     }
286
287     return OperationState.OK;
288   }
289
290   /**
291    * Perform sync.
292    */
293   private void performSync() {
294     while (selflinks.peek() != null) {
295
296       SelfLinkDescriptor linkDescriptor = selflinks.poll();
297       aaiWorkOnHand.decrementAndGet();
298
299       CrossEntityReferenceDescriptor descriptor = null;
300
301       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
302
303         descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors()
304             .get(linkDescriptor.getEntityType());
305
306         if (descriptor == null) {
307           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
308           // go to next element in iterator
309           continue;
310         }
311
312         if (descriptor.hasCrossEntityReferences()) {
313
314           NetworkTransaction txn = new NetworkTransaction();
315           txn.setDescriptor(descriptor);
316           txn.setLink(linkDescriptor.getSelfLink());
317           txn.setQueryParameters(linkDescriptor.getDepthModifier());
318           txn.setOperationType(HttpMethod.GET);
319           txn.setEntityType(linkDescriptor.getEntityType());
320
321           aaiWorkOnHand.incrementAndGet();
322
323           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
324               .whenComplete((result, error) -> {
325
326                 aaiWorkOnHand.decrementAndGet();
327
328                 if (error != null) {
329                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
330                 } else {
331                   if (result == null) {
332                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
333                   } else {
334                     updateActiveInventoryCounters(result);
335                     fetchDocumentForUpsert(result);
336                   }
337                 }
338               });
339         }
340       }
341     }
342   }
343
344   /**
345    * Process entity type self links.
346    *
347    * @param operationResult the operation result
348    */
349   private void processEntityTypeSelfLinks(OperationResult operationResult) {
350
351     final String jsonResult = operationResult.getResult();
352
353     if (jsonResult != null && jsonResult.length() > 0) {
354
355       try {
356         JsonNode rootNode = mapper.readTree(jsonResult);
357         JsonNode resultData = rootNode.get("result-data");
358
359         if (resultData.isArray()) {
360           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
361
362           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
363
364           while (elementIterator.hasNext()) {
365             JsonNode element = elementIterator.next();
366
367             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
368             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
369
370             if (resourceType != null && resourceLink != null) {
371               CrossEntityReferenceDescriptor descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
372
373               if (descriptor == null) {
374                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
375                 // go to next element in iterator
376                 continue;
377               }
378               if (descriptor.hasCrossEntityReferences()) {
379                 selflinks.add(new SelfLinkDescriptor(
380                         resourceLink, SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
381               }
382             }
383           }
384         }
385       } catch (IOException exc) {
386         // TODO // TODO -> LOG, waht should be logged here?
387       }
388     }
389   }
390
391   
392   
393   /**
394    * By providing the entity type and a json node for the entity, determine the
395    * primary key name(s) + primary key value(s) sufficient to build an entity query string
396    * of the following format:
397    * 
398    *      <entityType>.<primaryKeyNames>:<primaryKeyValues>
399    * 
400    * @return - a composite string in the above format or null
401    */
402   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
403     
404     OxmEntityDescriptor entityDescriptor =
405         oxmEntityLookup.getEntityDescriptors().get(entityType);
406     
407     String queryString = null;
408     
409     if ( entityDescriptor != null ) {
410
411       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeNames();
412       final List<String> keyValues = new ArrayList<String>();
413       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
414
415       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames,"/") + ":" + NodeUtils.concatArray(keyValues);
416
417     } 
418     
419     return queryString;
420
421     
422   }
423   
424   /**
425    * Fetch document for upsert.
426    *
427    * @param txn the txn
428    */
429   private void fetchDocumentForUpsert(NetworkTransaction txn) {
430     
431     if (!txn.getOperationResult().wasSuccessful()) {
432       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
433       return;
434     }
435
436     CrossEntityReferenceDescriptor cerDescriptor = crossEntityReferenceLookup
437         .getCrossReferenceEntityDescriptors().get(txn.getDescriptor().getEntityName());
438     
439     if (cerDescriptor != null && cerDescriptor.hasCrossEntityReferences()) {
440
441       final String jsonResult = txn.getOperationResult().getResult();
442       
443       if (jsonResult != null && jsonResult.length() > 0) {
444         
445         /**
446          * Here's what we are going to do:
447          * 
448          * <li>Extract primary key name and value from the parent type.
449          * <li>Extract the primary key and value from the nested child instance.
450          * <li>Build a generic query to discover the self-link for the nested-child-instance using
451          * parent and child.
452          * <li>Set the self-link on the child.
453          * <li>Generate the id that will allow the elastic-search upsert to work.
454          * <li>Rinse and repeat.
455          */
456           
457           CrossEntityReference cerDefinition = cerDescriptor.getCrossEntityReference();
458
459           if (cerDefinition != null) {
460             JsonNode convertedNode = null;
461             try {
462               convertedNode = NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
463               
464               final String parentEntityQueryString = determineEntityQueryString(txn.getEntityType(), convertedNode);
465               
466               List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
467
468               NodeUtils.extractFieldValuesFromObject(convertedNode,
469                   cerDefinition.getReferenceAttributes(),
470                   extractedParentEntityAttributeValues);
471
472               List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
473               NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
474                   nestedTargetEntityInstances);
475
476               for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
477
478                 if (cerDescriptor != null) {
479                   
480                   String childEntityType = cerDefinition.getTargetEntityType();
481                   OxmEntityDescriptor childDesciptor = oxmEntityLookup.getEntityDescriptors().get(childEntityType);
482                   
483                   List<String> childPrimaryKeyNames = null;
484                   
485                   if (childDesciptor != null) {
486                     childPrimaryKeyNames = childDesciptor.getPrimaryKeyAttributeNames();
487                   } else {
488                     childPrimaryKeyNames = new ArrayList<String>();
489                   }
490                                 
491                   List<String> childKeyValues = new ArrayList<String>();
492                   NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames, childKeyValues);
493                   
494                   String childEntityQueryKeyString = childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames,"/") + ":" + NodeUtils.concatArray(childKeyValues);
495                   
496                   /**
497                    * Build generic-query to query child instance self-link from AAI
498                    */
499                   List<String> orderedQueryKeyParams = new ArrayList<String>();
500
501                   /**
502                    * At present, there is an issue with resolving the self-link using the
503                    * generic-query with nothing more than the service-instance identifier and the
504                    * service-subscription. There is another level of detail we don't have access to
505                    * unless we parse it out of the service-subscription self-link, which is a
506                    * coupling I would like to avoid. Fortunately, there is a workaround, but only
507                    * for service-instances, which is presently our only use-case for the
508                    * cross-entity-reference in R1707. Going forwards hopefully there will be other
509                    * ways to resolve a child self-link using parental embedded meta data that we
510                    * don't currently have.
511                    * 
512                    * The work-around with the service-instance entity-type is that it's possible to
513                    * request the self-link using only the service-instance-id because of a
514                    * historical AAI functional query requirement that it be possible to query a
515                    * service-instance only by it's service-instance-id. This entity type is the only
516                    * one in the system that can be queried this way which makes it a very limited
517                    * workaround, but good enough for the current release.
518                    */
519
520                   if (SERVICE_INSTANCE.equals(childEntityType)) {
521                     orderedQueryKeyParams.clear();
522                     orderedQueryKeyParams.add(childEntityQueryKeyString);
523                   } else {
524                     orderedQueryKeyParams.add(parentEntityQueryString);
525                     orderedQueryKeyParams.add(childEntityQueryKeyString);
526                   }
527
528                   String genericQueryStr = null;
529                   try {
530                     genericQueryStr = aaiAdapter.getGenericQueryForSelfLink(childEntityType, orderedQueryKeyParams);
531                     
532                     if (genericQueryStr != null) {
533                       aaiWorkOnHand.incrementAndGet();
534
535                       OperationResult aaiQueryResult = aaiAdapter.queryActiveInventoryWithRetries(
536                           genericQueryStr, "application/json",
537                           aaiAdapter.getEndpointConfig().getNumRequestRetries());
538
539                       aaiWorkOnHand.decrementAndGet();
540
541                       if (aaiQueryResult!= null && aaiQueryResult.wasSuccessful()) {
542                         
543                         Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
544                         JsonNode genericQueryResult = null;
545                         try {
546                           genericQueryResult = NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
547                           
548                           if ( genericQueryResult != null ) {
549                             
550                             NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link", entityLinks);
551
552                             String selfLink = null;
553
554                             if (entityLinks.size() != 1) {
555                               /**
556                                * an ambiguity exists where we can't reliably determine the self
557                                * link, this should be a permanent error
558                                */
559                               LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY, String.valueOf(entityLinks.size()));
560                           } else {
561                             selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
562
563
564                             IndexableCrossEntityReference icer =
565                                 getPopulatedDocument(targetEntityInstance, cerDescriptor);
566
567                             for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
568                               icer.addCrossEntityReferenceValue(
569                                   parentCrossEntityReferenceAttributeValue);
570                             }
571
572                             icer.setLink(ActiveInventoryAdapter.extractResourcePath(selfLink));
573
574                             icer.deriveFields();
575
576                             String link = null;
577                             try {
578                               link = searchServiceAdapter
579                                   .buildSearchServiceDocUrl(getIndexName(), icer.getId());
580                             } catch (Exception exc) {
581                               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
582                                   exc.getLocalizedMessage());
583                             }
584
585                             if (link != null) {
586                               NetworkTransaction n2 = new NetworkTransaction();
587                               n2.setLink(link);
588                               n2.setEntityType(txn.getEntityType());
589                               n2.setDescriptor(txn.getDescriptor());
590                               n2.setOperationType(HttpMethod.GET);
591
592                               esWorkOnHand.incrementAndGet();
593
594                               supplyAsync(
595                                       new PerformSearchServiceRetrieval(n2, searchServiceAdapter),
596                                       esExecutor).whenComplete((result, error) -> {
597
598                                     esWorkOnHand.decrementAndGet();
599
600                                     if (error != null) {
601                                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED,
602                                           error.getLocalizedMessage());
603                                     } else {
604                                       updateElasticSearchCounters(result);
605                                       performDocumentUpsert(result, icer);
606                                     }
607                                   });
608                             }
609
610                           }
611                           } else {
612                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
613                           }
614
615                         } catch (Exception exc) {
616                           LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(), exc.getLocalizedMessage());
617                         }
618                         
619                       } else {
620                         String result = aaiQueryResult != null ? aaiQueryResult.getResult() : "unknown";
621                         String message = "Entity sync failed because AAI query failed with error " + result;
622                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
623                       }
624                       
625                     } else {
626                       String message = "Entity Sync failed because generic query str could not be determined.";
627                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
628                     }
629                   } catch (Exception exc) {
630                     String message = "Failed to sync entity because generation of generic query failed with error = " + exc.getMessage();
631                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
632                   }
633                   
634                 }
635               }
636               
637             } catch (IOException ioe) {
638               LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
639             }
640           }
641           
642         } 
643       
644     } else {
645       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
646     }
647   }
648
649   /**
650    * Perform document upsert.
651    *
652    * @param esGetResult the es get result
653    * @param icer the icer
654    */
655   protected void performDocumentUpsert(NetworkTransaction esGetResult,
656       IndexableCrossEntityReference icer) {
657     /**
658      * <p>
659      * <ul>
660      * As part of the response processing we need to do the following:
661      * <li>1. Extract the version (if present), it will be the ETAG when we use the
662      * Search-Abstraction-Service
663      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
664      * tag
665      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
666      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
667      * </ul>
668      * </p>
669      */
670     String link = null;
671     try {
672       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), icer.getId());
673     } catch (Exception exc) {
674       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
675       return;
676     }
677
678     boolean wasEntryDiscovered = false;
679     String versionNumber = null;
680     if (esGetResult.getOperationResult().getResultCode() == 404) {
681       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
682     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
683       wasEntryDiscovered = true;
684       try {
685         versionNumber = NodeUtils.extractFieldValueFromObject(
686             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
687             "_version");
688       } catch (IOException exc) {
689         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
690             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
691         return;
692       }
693     } else {
694       /*
695        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
696        * return.
697        */
698       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
699           String.valueOf(esGetResult.getOperationResult().getResultCode()));
700       return;
701     }
702
703     try {
704       String jsonPayload = null;
705       if (wasEntryDiscovered) {
706         try {
707           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
708           NodeUtils.extractObjectsByKey(
709               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
710               "_source", sourceObject);
711
712           if (!sourceObject.isEmpty()) {
713             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
714             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
715             ObjectReader updater = mapper.readerForUpdating(me);
716             MergableEntity merged = updater.readValue(icer.getAsJson());
717             jsonPayload = mapper.writeValueAsString(merged);
718           }
719         } catch (IOException exc) {
720           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
721               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
722           return;
723         }
724       } else {
725         jsonPayload = icer.getAsJson();
726       }
727
728       if (wasEntryDiscovered) {
729         if (versionNumber != null && jsonPayload != null) {
730
731                 String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
732                     icer.getId(), jsonPayload);
733
734           NetworkTransaction transactionTracker = new NetworkTransaction();
735           transactionTracker.setEntityType(esGetResult.getEntityType());
736           transactionTracker.setDescriptor(esGetResult.getDescriptor());
737           transactionTracker.setOperationType(HttpMethod.PUT);
738
739           esWorkOnHand.incrementAndGet();
740           supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
741                   requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
742                       .whenComplete((result, error) -> {
743
744                     esWorkOnHand.decrementAndGet();
745
746                     if (error != null) {
747                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
748                     } else {
749                       updateElasticSearchCounters(result);
750                       processStoreDocumentResult(result, esGetResult, icer);
751                     }
752                   });
753         }
754
755       } else {
756         if (link != null && jsonPayload != null) {
757
758           NetworkTransaction updateElasticTxn = new NetworkTransaction();
759           updateElasticTxn.setLink(link);
760           updateElasticTxn.setEntityType(esGetResult.getEntityType());
761           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
762           updateElasticTxn.setOperationType(HttpMethod.PUT);
763
764           esWorkOnHand.incrementAndGet();
765           supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
766                   esPutExecutor).whenComplete((result, error) -> {
767
768                 esWorkOnHand.decrementAndGet();
769
770                 if (error != null) {
771                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
772                 } else {
773                   updateElasticSearchCounters(result);
774                   processStoreDocumentResult(result, esGetResult, icer);
775                 }
776               });
777         }
778       }
779     } catch (Exception exc) {
780       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
781     }
782   }
783
784   /**
785    * Process store document result.
786    *
787    * @param esPutResult the es put result
788    * @param esGetResult the es get result
789    * @param icer the icer
790    */
791   private void processStoreDocumentResult(NetworkTransaction esPutResult,
792       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
793
794     OperationResult or = esPutResult.getOperationResult();
795
796     if (!or.wasSuccessful()) {
797       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
798
799         if (shouldAllowRetry(icer.getId())) {
800
801           esWorkOnHand.incrementAndGet();
802
803           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
804           retryQueue.push(rsc);
805
806           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
807         }
808       } else {
809         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
810             or.getResult());
811       }
812     }
813   }
814
815   /**
816    * Perform retry sync.
817    */
818   private void performRetrySync() {
819     while (retryQueue.peek() != null) {
820
821       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
822       if (rsc != null) {
823
824         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
825         NetworkTransaction txn = rsc.getNetworkTransaction();
826
827         final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result,icer);
828         performRetrySync(icer.getId(), networkTransactionConsumer, txn);
829
830       }
831     }
832   }
833
834   /**
835    * Should allow retry.
836    *
837    * @param id the id
838    * @return true, if successful
839    */
840   private boolean shouldAllowRetry(String id) {
841     boolean isRetryAllowed = true;
842     if (retryLimitTracker.get(id) != null) {
843       Integer currentCount = retryLimitTracker.get(id);
844       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
845         isRetryAllowed = false;
846         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
847       } else {
848         Integer newCount = new Integer(currentCount.intValue() + 1);
849         retryLimitTracker.put(id, newCount);
850       }
851
852     } else {
853       Integer firstRetryCount = new Integer(1);
854       retryLimitTracker.put(id, firstRetryCount);
855     }
856
857     return isRetryAllowed;
858   }
859
860   /**
861    * Gets the populated document.
862    *
863    * @param entityNode the entity node
864    * @param resultDescriptor the result descriptor
865    * @return the populated document
866    * @throws JsonProcessingException the json processing exception
867    * @throws IOException Signals that an I/O exception has occurred.
868    */
869   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
870       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
871
872     IndexableCrossEntityReference icer = new IndexableCrossEntityReference();
873
874     icer.setEntityType(resultDescriptor.getEntityName());
875
876     List<String> primaryKeyValues = new ArrayList<String>();
877     String pkeyValue = null;
878
879     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
880       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
881       if (pkeyValue != null) {
882         primaryKeyValues.add(pkeyValue);
883       } else {
884         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
885       }
886     }
887
888     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
889     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
890
891     return icer;
892
893   }
894 }