Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ExecutorService;
42 import java.util.function.Supplier;
43
44 import org.openecomp.cl.api.Logger;
45 import org.openecomp.cl.eelf.LoggerFactory;
46 import org.openecomp.sparky.config.oxm.CrossEntityReference;
47 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
48 import org.openecomp.sparky.dal.NetworkTransaction;
49 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
50 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
51 import org.openecomp.sparky.dal.rest.HttpMethod;
52 import org.openecomp.sparky.dal.rest.OperationResult;
53 import org.openecomp.sparky.logging.AaiUiMsgs;
54 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
55 import org.openecomp.sparky.synchronizer.entity.IndexableCrossEntityReference;
56 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
57 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
58 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
59 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
60 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
61 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
62 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
63 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
64 import org.openecomp.sparky.util.NodeUtils;
65 import org.slf4j.MDC;
66
67 import org.openecomp.cl.mdc.MdcContext;
68 import com.fasterxml.jackson.core.JsonProcessingException;
69 import com.fasterxml.jackson.databind.JsonNode;
70 import com.fasterxml.jackson.databind.ObjectReader;
71 import com.fasterxml.jackson.databind.node.ArrayNode;
72
73 /**
74  * The Class CrossEntityReferenceSynchronizer.
75  */
76 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
77     implements IndexSynchronizer {
78
79   /**
80    * The Class RetryCrossEntitySyncContainer.
81    */
82   private class RetryCrossEntitySyncContainer {
83     NetworkTransaction txn;
84     IndexableCrossEntityReference icer;
85
86     /**
87      * Instantiates a new retry cross entity sync container.
88      *
89      * @param txn the txn
90      * @param icer the icer
91      */
92     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
93         IndexableCrossEntityReference icer) {
94       this.txn = txn;
95       this.icer = icer;
96     }
97
98     public NetworkTransaction getNetworkTransaction() {
99       return txn;
100     }
101
102     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
103       return icer;
104     }
105   }
106
107   private static final Logger LOG =
108       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
109
110   private Deque<SelfLinkDescriptor> selflinks;
111   private Deque<RetryCrossEntitySyncContainer> retryQueue;
112   private Map<String, Integer> retryLimitTracker;
113   private boolean isAllWorkEnumerated;
114   protected ExecutorService esPutExecutor;
115   protected ActiveInventoryConfig aaiConfig;
116
117   /**
118    * Instantiates a new cross entity reference synchronizer.
119    *
120    * @param indexName the index name
121    * @throws Exception the exception
122    */
123   public CrossEntityReferenceSynchronizer(String indexName, ActiveInventoryConfig aaiConfig) throws Exception {
124     super(LOG, "CERS", 2, 5, 5, indexName);
125     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
126     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
127     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
128     this.synchronizerName = "Cross Reference Entity Synchronizer";
129     this.isAllWorkEnumerated = false;
130     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
131     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
132         oxmModelLoader.getCrossReferenceEntityDescriptors());
133     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
134         oxmModelLoader.getCrossReferenceEntityDescriptors());
135     this.aaiConfig = aaiConfig;
136   }
137
138   /* (non-Javadoc)
139    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
140    */
141   @Override
142   public OperationState doSync() {
143         String txnID = NodeUtils.getRandomTxnId();
144     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
145         
146     resetCounters();
147     syncStartedTimeStampInMs = System.currentTimeMillis();
148     launchSyncFlow();
149     return OperationState.OK;
150   }
151
152   @Override
153   public SynchronizerState getState() {
154     if (!isSyncDone()) {
155       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
156     }
157
158     return SynchronizerState.IDLE;
159   }
160
161   /* (non-Javadoc)
162    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
163    */
164   @Override
165   public String getStatReport(boolean showFinalReport) {
166     return this.getStatReport(System.currentTimeMillis() - syncStartedTimeStampInMs,
167         showFinalReport);
168   }
169
170   /* (non-Javadoc)
171    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
172    */
173   @Override
174   public void shutdown() {
175     this.shutdownExecutors();
176   }
177
178   @Override
179   protected boolean isSyncDone() {
180     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
181
182     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
183       return false;
184     }
185
186     return true;
187   }
188
189   /**
190    * Launch sync flow.
191    *
192    * @return the operation state
193    */
194   private OperationState launchSyncFlow() {
195         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
196     Map<String, OxmEntityDescriptor> descriptorMap =
197         oxmModelLoader.getCrossReferenceEntityDescriptors();
198
199     if (descriptorMap.isEmpty()) {
200       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
201
202       return OperationState.ERROR;
203     }
204
205     Collection<String> syncTypes = descriptorMap.keySet();
206
207     try {
208
209       /*
210        * launch a parallel async thread to process the documents for each entity-type (to max the of
211        * the configured executor anyway)
212        */
213
214       aaiWorkOnHand.set(syncTypes.size());
215
216       for (String key : syncTypes) {
217
218         supplyAsync(new Supplier<Void>() {
219
220           @Override
221           public Void get() {
222                 MDC.setContextMap(contextMap);
223             OperationResult typeLinksResult = null;
224             try {
225               typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
226               aaiWorkOnHand.decrementAndGet();
227               processEntityTypeSelfLinks(typeLinksResult);
228             } catch (Exception exc) {
229               // TODO -> LOG, what should be logged here?
230             }
231
232             return null;
233           }
234
235         }, aaiExecutor).whenComplete((result, error) -> {
236           if (error != null) {
237             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
238           }
239         });
240       }
241
242       while (aaiWorkOnHand.get() != 0) {
243
244         if (LOG.isDebugEnabled()) {
245           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
246         }
247
248         Thread.sleep(1000);
249       }
250
251       aaiWorkOnHand.set(selflinks.size());
252       isAllWorkEnumerated = true;
253       performSync();
254
255       while (!isSyncDone()) {
256         performRetrySync();
257         Thread.sleep(1000);
258       }
259
260       /*
261        * Make sure we don't hang on to retries that failed which could cause issues during future
262        * syncs
263        */
264       retryLimitTracker.clear();
265
266     } catch (Exception exc) {
267       // TODO -> LOG, waht should be logged here?
268     }
269
270     return OperationState.OK;
271   }
272
273   /**
274    * Perform sync.
275    */
276   private void performSync() {
277     while (selflinks.peek() != null) {
278
279       SelfLinkDescriptor linkDescriptor = selflinks.poll();
280       aaiWorkOnHand.decrementAndGet();
281
282       OxmEntityDescriptor descriptor = null;
283
284       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
285
286         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
287
288         if (descriptor == null) {
289           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
290           // go to next element in iterator
291           continue;
292         }
293
294         if (descriptor.hasCrossEntityReferences()) {
295
296           NetworkTransaction txn = new NetworkTransaction();
297           txn.setDescriptor(descriptor);
298           txn.setLink(linkDescriptor.getSelfLink() + linkDescriptor.getDepthModifier());
299           txn.setOperationType(HttpMethod.GET);
300           txn.setEntityType(linkDescriptor.getEntityType());
301
302           aaiWorkOnHand.incrementAndGet();
303
304           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
305               .whenComplete((result, error) -> {
306
307                 aaiWorkOnHand.decrementAndGet();
308
309                 if (error != null) {
310                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
311                 } else {
312                   if (result == null) {
313                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
314                   } else {
315                     updateActiveInventoryCounters(result);
316                     fetchDocumentForUpsert(result);
317                   }
318                 }
319               });
320         }
321       }
322     }
323   }
324
325   /**
326    * Process entity type self links.
327    *
328    * @param operationResult the operation result
329    */
330   private void processEntityTypeSelfLinks(OperationResult operationResult) {
331
332     JsonNode rootNode = null;
333
334     final String jsonResult = operationResult.getResult();
335
336     if (jsonResult != null && jsonResult.length() > 0) {
337
338       try {
339         rootNode = mapper.readTree(jsonResult);
340       } catch (IOException exc) {
341         // TODO // TODO -> LOG, waht should be logged here?
342       }
343
344       JsonNode resultData = rootNode.get("result-data");
345       ArrayNode resultDataArrayNode = null;
346
347       if (resultData.isArray()) {
348         resultDataArrayNode = (ArrayNode) resultData;
349
350         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
351         JsonNode element = null;
352
353         while (elementIterator.hasNext()) {
354           element = elementIterator.next();
355
356           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
357           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
358
359           OxmEntityDescriptor descriptor = null;
360
361           if (resourceType != null && resourceLink != null) {
362             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
363
364             if (descriptor == null) {
365               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
366               // go to next element in iterator
367               continue;
368             }
369             if (descriptor.hasCrossEntityReferences()) {
370               selflinks.add(new SelfLinkDescriptor(
371                   resourceLink,SynchronizerConfiguration.DEPTH_ALL_MODIFIER, resourceType));
372             }
373           }
374         }
375       }
376     }
377   }
378
379   
380   
381   /**
382    * By providing the entity type and a json node for the entity, determine the
383    * primary key name(s) + primary key value(s) sufficient to build an entity query string
384    * of the following format:
385    * 
386    *      <entityType>.<primaryKeyNames>:<primaryKeyValues>
387    * 
388    * @return - a composite string in the above format or null
389    */
390   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
391     
392     OxmEntityDescriptor entityDescriptor =
393         oxmModelLoader.getEntityDescriptor(entityType);
394     
395     String queryString = null;
396     
397     if ( entityDescriptor != null ) {
398
399       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeName();
400       final List<String> keyValues = new ArrayList<String>();
401       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
402
403       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames,"/") + ":" + NodeUtils.concatArray(keyValues);
404
405     } 
406     
407     return queryString;
408
409     
410   }
411   
412   /**
413    * Fetch document for upsert.
414    *
415    * @param txn the txn
416    */
417   private void fetchDocumentForUpsert(NetworkTransaction txn) {
418     
419     if (!txn.getOperationResult().wasSuccessful()) {
420       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
421       return;
422     }
423
424     if (txn.getDescriptor().hasCrossEntityReferences()) {
425
426       final String jsonResult = txn.getOperationResult().getResult();
427       
428       if (jsonResult != null && jsonResult.length() > 0) {
429         
430         /**
431          * Here's what we are going to do:
432          * 
433          * <li>Extract primary key name and value from the parent type.
434          * <li>Extract the primary key and value from the nested child instance.
435          * <li>Build a generic query to discover the self-link for the nested-child-instance using
436          * parent and child.
437          * <li>Set the self-link on the child.
438          * <li>Generate the id that will allow the elastic-search upsert to work.
439          * <li>Rinse and repeat.
440          */
441
442         OxmEntityDescriptor parentEntityDescriptor =
443             oxmModelLoader.getEntityDescriptor(txn.getEntityType());
444         
445         if ( parentEntityDescriptor != null ) {
446           
447           CrossEntityReference cerDefinition = parentEntityDescriptor.getCrossEntityReference();
448
449           if (cerDefinition != null) {
450             JsonNode convertedNode = null;
451             try {
452               convertedNode = NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
453               
454               final String parentEntityQueryString = determineEntityQueryString(txn.getEntityType(), convertedNode);
455               
456               List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
457
458               NodeUtils.extractFieldValuesFromObject(convertedNode,
459                   cerDefinition.getReferenceAttributes(),
460                   extractedParentEntityAttributeValues);
461
462               List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
463               NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
464                   nestedTargetEntityInstances);
465
466               for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
467
468                 OxmEntityDescriptor cerDescriptor =
469                     oxmModelLoader.getSearchableEntityDescriptor(cerDefinition.getTargetEntityType());
470
471                 if (cerDescriptor != null) {
472                   
473                   String childEntityType = cerDefinition.getTargetEntityType();
474                   
475                   List<String> childPrimaryKeyNames = cerDescriptor.getPrimaryKeyAttributeName();
476                   
477                   List<String> childKeyValues = new ArrayList<String>();
478                   NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames, childKeyValues);
479                   
480                   String childEntityQueryKeyString = childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames,"/") + ":" + NodeUtils.concatArray(childKeyValues);
481                   
482                   /**
483                    * Build generic-query to query child instance self-link from AAI
484                    */
485                   List<String> orderedQueryKeyParams = new ArrayList<String>();
486                   orderedQueryKeyParams.add(parentEntityQueryString);
487                   orderedQueryKeyParams.add(childEntityQueryKeyString);
488                   String genericQueryStr = null;
489                   try {
490                     genericQueryStr = aaiDataProvider.getGenericQueryForSelfLink(childEntityType, orderedQueryKeyParams);
491                     
492                     if (genericQueryStr != null) {
493
494                       OperationResult aaiQueryResult = aaiDataProvider.queryActiveInventoryWithRetries(
495                           genericQueryStr, "application/json",
496                           aaiConfig.getAaiRestConfig().getNumRequestRetries());
497                       
498                       if (aaiQueryResult!= null && aaiQueryResult.wasSuccessful()) {
499                         
500                         Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
501                         JsonNode genericQueryResult = null;
502                         try {
503                           genericQueryResult = NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
504                           
505                           if ( genericQueryResult != null ) {
506                             
507                             NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link", entityLinks);
508
509                             String selfLink = null;
510
511                             if (entityLinks.size() != 1) {
512                               /**
513                                * an ambiguity exists where we can't reliably determine the self
514                                * link, this should be a permanent error
515                                */
516                               LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY, String.valueOf(entityLinks.size()));
517                             } else {
518                               selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
519                               
520                               if (!cerDescriptor.getSearchableAttributes().isEmpty()) {
521
522                                 IndexableCrossEntityReference icer =
523                                     getPopulatedDocument(targetEntityInstance, cerDescriptor);
524
525                                 for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
526                                   icer.addCrossEntityReferenceValue(
527                                       parentCrossEntityReferenceAttributeValue);
528                                 }
529                                 
530                                 icer.setLink(selfLink);
531
532                                 icer.deriveFields();
533
534                                 String link = null;
535                                 try {
536                                   link = getElasticFullUrl("/" + icer.getId(), getIndexName());
537                                 } catch (Exception exc) {
538                                   LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
539                                 }
540
541                                 if (link != null) {
542                                   NetworkTransaction n2 = new NetworkTransaction();
543                                   n2.setLink(link);
544                                   n2.setEntityType(txn.getEntityType());
545                                   n2.setDescriptor(txn.getDescriptor());
546                                   n2.setOperationType(HttpMethod.GET);
547
548                                   esWorkOnHand.incrementAndGet();
549
550                                   supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider),
551                                       esExecutor).whenComplete((result, error) -> {
552
553                                         esWorkOnHand.decrementAndGet();
554
555                                         if (error != null) {
556                                           LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
557                                         } else {
558                                           updateElasticSearchCounters(result);
559                                           performDocumentUpsert(result, icer);
560                                         }
561                                       });
562                                 }
563                               }
564                             }
565                           } else {
566                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
567                           }
568
569                         } catch (Exception exc) {
570                           LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(), exc.getLocalizedMessage());
571                         }
572                         
573                       } else {
574                         String message = "Entity sync failed because AAI query failed with error " + aaiQueryResult.getResult(); 
575                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
576                       }
577                       
578                     } else {
579                       String message = "Entity Sync failed because generic query str could not be determined.";
580                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
581                     }
582                   } catch (Exception exc) {
583                     String message = "Failed to sync entity because generation of generic query failed with error = " + exc.getMessage();
584                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
585                   }
586                   
587                 }
588               }
589               
590             } catch (IOException ioe) {
591               LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
592             }
593           }
594           
595         } else {
596           LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
597         }
598       }
599     }
600   }
601
602   /**
603    * Perform document upsert.
604    *
605    * @param esGetResult the es get result
606    * @param icer the icer
607    */
608   protected void performDocumentUpsert(NetworkTransaction esGetResult,
609       IndexableCrossEntityReference icer) {
610     /**
611      * <p>
612      * <ul>
613      * As part of the response processing we need to do the following:
614      * <li>1. Extract the version (if present), it will be the ETAG when we use the
615      * Search-Abstraction-Service
616      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
617      * tag
618      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
619      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
620      * </ul>
621      * </p>
622      */
623     String link = null;
624     try {
625       link = getElasticFullUrl("/" + icer.getId(), getIndexName());
626     } catch (Exception exc) {
627       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
628       return;
629     }
630
631     boolean wasEntryDiscovered = false;
632     String versionNumber = null;
633     if (esGetResult.getOperationResult().getResultCode() == 404) {
634       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
635     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
636       wasEntryDiscovered = true;
637       try {
638         versionNumber = NodeUtils.extractFieldValueFromObject(
639             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
640             "_version");
641       } catch (IOException exc) {
642         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
643             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
644         return;
645       }
646     } else {
647       /*
648        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
649        * return.
650        */
651       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
652           String.valueOf(esGetResult.getOperationResult().getResultCode()));
653       return;
654     }
655
656     try {
657       String jsonPayload = null;
658       if (wasEntryDiscovered) {
659         try {
660           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
661           NodeUtils.extractObjectsByKey(
662               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
663               "_source", sourceObject);
664
665           if (!sourceObject.isEmpty()) {
666             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
667             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
668             ObjectReader updater = mapper.readerForUpdating(me);
669             MergableEntity merged = updater.readValue(icer.getIndexDocumentJson());
670             jsonPayload = mapper.writeValueAsString(merged);
671           }
672         } catch (IOException exc) {
673           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
674               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
675           return;
676         }
677       } else {
678         jsonPayload = icer.getIndexDocumentJson();
679       }
680
681       if (wasEntryDiscovered) {
682         if (versionNumber != null && jsonPayload != null) {
683
684           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
685               ElasticSearchConfig.getConfig().getType(), icer.getId(), versionNumber, jsonPayload);
686
687           NetworkTransaction transactionTracker = new NetworkTransaction();
688           transactionTracker.setEntityType(esGetResult.getEntityType());
689           transactionTracker.setDescriptor(esGetResult.getDescriptor());
690           transactionTracker.setOperationType(HttpMethod.PUT);
691
692           esWorkOnHand.incrementAndGet();
693           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
694               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
695                   .whenComplete((result, error) -> {
696
697                     esWorkOnHand.decrementAndGet();
698
699                     if (error != null) {
700                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
701                     } else {
702                       updateElasticSearchCounters(result);
703                       processStoreDocumentResult(result, esGetResult, icer);
704                     }
705                   });
706         }
707
708       } else {
709         if (link != null && jsonPayload != null) {
710
711           NetworkTransaction updateElasticTxn = new NetworkTransaction();
712           updateElasticTxn.setLink(link);
713           updateElasticTxn.setEntityType(esGetResult.getEntityType());
714           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
715           updateElasticTxn.setOperationType(HttpMethod.PUT);
716
717           esWorkOnHand.incrementAndGet();
718           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
719               esPutExecutor).whenComplete((result, error) -> {
720
721                 esWorkOnHand.decrementAndGet();
722
723                 if (error != null) {
724                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
725                 } else {
726                   updateElasticSearchCounters(result);
727                   processStoreDocumentResult(result, esGetResult, icer);
728                 }
729               });
730         }
731       }
732     } catch (Exception exc) {
733       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
734     }
735   }
736
737   /**
738    * Process store document result.
739    *
740    * @param esPutResult the es put result
741    * @param esGetResult the es get result
742    * @param icer the icer
743    */
744   private void processStoreDocumentResult(NetworkTransaction esPutResult,
745       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
746
747     OperationResult or = esPutResult.getOperationResult();
748
749     if (!or.wasSuccessful()) {
750       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
751
752         if (shouldAllowRetry(icer.getId())) {
753
754           esWorkOnHand.incrementAndGet();
755
756           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
757           retryQueue.push(rsc);
758
759           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
760         }
761       } else {
762         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
763             or.getResult());
764       }
765     }
766   }
767
768   /**
769    * Perform retry sync.
770    */
771   private void performRetrySync() {
772     while (retryQueue.peek() != null) {
773
774       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
775       if (rsc != null) {
776
777         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
778         NetworkTransaction txn = rsc.getNetworkTransaction();
779
780         String link = null;
781         try {
782           // In this retry flow the icer object has already
783           // derived its fields
784           link = getElasticFullUrl("/" + icer.getId(), getIndexName());
785         } catch (Exception exc) {
786           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
787         }
788
789         if (link != null) {
790           NetworkTransaction retryTransaction = new NetworkTransaction();
791           retryTransaction.setLink(link);
792           retryTransaction.setEntityType(txn.getEntityType());
793           retryTransaction.setDescriptor(txn.getDescriptor());
794           retryTransaction.setOperationType(HttpMethod.GET);
795
796           /*
797            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
798            * that for this request already when queuing the failed PUT!
799            */
800
801           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
802               esExecutor).whenComplete((result, error) -> {
803
804                 esWorkOnHand.decrementAndGet();
805
806                 if (error != null) {
807                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
808                 } else {
809                   updateElasticSearchCounters(result);
810                   performDocumentUpsert(result, icer);
811                 }
812               });
813         }
814
815       }
816     }
817   }
818
819   /**
820    * Should allow retry.
821    *
822    * @param id the id
823    * @return true, if successful
824    */
825   private boolean shouldAllowRetry(String id) {
826     boolean isRetryAllowed = true;
827     if (retryLimitTracker.get(id) != null) {
828       Integer currentCount = retryLimitTracker.get(id);
829       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
830         isRetryAllowed = false;
831         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
832       } else {
833         Integer newCount = new Integer(currentCount.intValue() + 1);
834         retryLimitTracker.put(id, newCount);
835       }
836
837     } else {
838       Integer firstRetryCount = new Integer(1);
839       retryLimitTracker.put(id, firstRetryCount);
840     }
841
842     return isRetryAllowed;
843   }
844
845   /**
846    * Gets the populated document.
847    *
848    * @param entityNode the entity node
849    * @param resultDescriptor the result descriptor
850    * @return the populated document
851    * @throws JsonProcessingException the json processing exception
852    * @throws IOException Signals that an I/O exception has occurred.
853    */
854   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
855       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
856
857     IndexableCrossEntityReference icer = new IndexableCrossEntityReference(oxmModelLoader);
858
859     icer.setEntityType(resultDescriptor.getEntityName());
860
861     List<String> primaryKeyValues = new ArrayList<String>();
862     String pkeyValue = null;
863
864     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
865       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
866       if (pkeyValue != null) {
867         primaryKeyValues.add(pkeyValue);
868       } else {
869         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
870       }
871     }
872
873     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
874     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
875
876     return icer;
877
878   }
879 }