c1da401ce4e9a793ede89e34ab24559199719aa7
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.Deque;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ExecutorService;
42 import java.util.function.Supplier;
43
44 import org.openecomp.cl.api.Logger;
45 import org.openecomp.cl.eelf.LoggerFactory;
46 import org.openecomp.sparky.config.oxm.CrossEntityReference;
47 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
48 import org.openecomp.sparky.dal.NetworkTransaction;
49 import org.openecomp.sparky.dal.aai.config.ActiveInventoryConfig;
50 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
51 import org.openecomp.sparky.dal.rest.HttpMethod;
52 import org.openecomp.sparky.dal.rest.OperationResult;
53 import org.openecomp.sparky.logging.AaiUiMsgs;
54 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
55 import org.openecomp.sparky.synchronizer.entity.IndexableCrossEntityReference;
56 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
57 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
58 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
59 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
60 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
61 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
62 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
63 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
64 import org.openecomp.sparky.util.NodeUtils;
65 import org.slf4j.MDC;
66
67 import org.openecomp.cl.mdc.MdcContext;
68 import com.fasterxml.jackson.core.JsonProcessingException;
69 import com.fasterxml.jackson.databind.JsonNode;
70 import com.fasterxml.jackson.databind.ObjectReader;
71 import com.fasterxml.jackson.databind.node.ArrayNode;
72
73 /**
74  * The Class CrossEntityReferenceSynchronizer.
75  */
76 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
77     implements IndexSynchronizer {
78
79   /**
80    * The Class RetryCrossEntitySyncContainer.
81    */
82   private class RetryCrossEntitySyncContainer {
83     NetworkTransaction txn;
84     IndexableCrossEntityReference icer;
85
86     /**
87      * Instantiates a new retry cross entity sync container.
88      *
89      * @param txn the txn
90      * @param icer the icer
91      */
92     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
93         IndexableCrossEntityReference icer) {
94       this.txn = txn;
95       this.icer = icer;
96     }
97
98     public NetworkTransaction getNetworkTransaction() {
99       return txn;
100     }
101
102     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
103       return icer;
104     }
105   }
106
107   private static final Logger LOG =
108       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
109   
110   private static final String SERVICE_INSTANCE = "service-instance";
111   private Deque<SelfLinkDescriptor> selflinks;
112   private Deque<RetryCrossEntitySyncContainer> retryQueue;
113   private Map<String, Integer> retryLimitTracker;
114   private boolean isAllWorkEnumerated;
115   protected ExecutorService esPutExecutor;
116   protected ActiveInventoryConfig aaiConfig;
117
118   /**
119    * Instantiates a new cross entity reference synchronizer.
120    *
121    * @param indexName the index name
122    * @throws Exception the exception
123    */
124   public CrossEntityReferenceSynchronizer(String indexName, ActiveInventoryConfig aaiConfig) throws Exception {
125     super(LOG, "CERS", 2, 5, 5, indexName);
126     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
127     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
128     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
129     this.synchronizerName = "Cross Reference Entity Synchronizer";
130     this.isAllWorkEnumerated = false;
131     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
132     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
133         oxmModelLoader.getCrossReferenceEntityDescriptors());
134     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
135         oxmModelLoader.getCrossReferenceEntityDescriptors());
136     this.aaiConfig = aaiConfig;
137     this.syncDurationInMs = -1;
138   }
139
140   /* (non-Javadoc)
141    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
142    */
143   @Override
144   public OperationState doSync() {
145         this.syncDurationInMs = -1;
146         String txnID = NodeUtils.getRandomTxnId();
147     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
148         
149     resetCounters();
150     syncStartedTimeStampInMs = System.currentTimeMillis();
151     launchSyncFlow();
152     return OperationState.OK;
153   }
154
155   @Override
156   public SynchronizerState getState() {
157     if (!isSyncDone()) {
158       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
159     }
160
161     return SynchronizerState.IDLE;
162   }
163
164   /* (non-Javadoc)
165    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
166    */
167   @Override
168   public String getStatReport(boolean showFinalReport) {
169           syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
170           return getStatReport(syncDurationInMs, showFinalReport);
171   }
172
173   /* (non-Javadoc)
174    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
175    */
176   @Override
177   public void shutdown() {
178     this.shutdownExecutors();
179   }
180
181   @Override
182   protected boolean isSyncDone() {
183     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
184
185     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
186       return false;
187     }
188
189     return true;
190   }
191
192   /**
193    * Launch sync flow.
194    *
195    * @return the operation state
196    */
197   private OperationState launchSyncFlow() {
198         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
199     Map<String, OxmEntityDescriptor> descriptorMap =
200         oxmModelLoader.getCrossReferenceEntityDescriptors();
201
202     if (descriptorMap.isEmpty()) {
203       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
204
205       return OperationState.ERROR;
206     }
207
208     Collection<String> syncTypes = descriptorMap.keySet();
209
210     try {
211
212       /*
213        * launch a parallel async thread to process the documents for each entity-type (to max the of
214        * the configured executor anyway)
215        */
216
217       aaiWorkOnHand.set(syncTypes.size());
218
219       for (String key : syncTypes) {
220
221         supplyAsync(new Supplier<Void>() {
222
223           @Override
224           public Void get() {
225                 MDC.setContextMap(contextMap);
226             OperationResult typeLinksResult = null;
227             try {
228               typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
229               aaiWorkOnHand.decrementAndGet();
230               processEntityTypeSelfLinks(typeLinksResult);
231             } catch (Exception exc) {
232               // TODO -> LOG, what should be logged here?
233             }
234
235             return null;
236           }
237
238         }, aaiExecutor).whenComplete((result, error) -> {
239           if (error != null) {
240             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
241           }
242         });
243       }
244
245       while (aaiWorkOnHand.get() != 0) {
246
247         if (LOG.isDebugEnabled()) {
248           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
249         }
250
251         Thread.sleep(1000);
252       }
253
254       aaiWorkOnHand.set(selflinks.size());
255       isAllWorkEnumerated = true;
256       performSync();
257
258       while (!isSyncDone()) {
259         performRetrySync();
260         Thread.sleep(1000);
261       }
262
263       /*
264        * Make sure we don't hang on to retries that failed which could cause issues during future
265        * syncs
266        */
267       retryLimitTracker.clear();
268
269     } catch (Exception exc) {
270       // TODO -> LOG, waht should be logged here?
271     }
272
273     return OperationState.OK;
274   }
275
276   /**
277    * Perform sync.
278    */
279   private void performSync() {
280     while (selflinks.peek() != null) {
281
282       SelfLinkDescriptor linkDescriptor = selflinks.poll();
283       aaiWorkOnHand.decrementAndGet();
284
285       OxmEntityDescriptor descriptor = null;
286
287       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
288
289         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
290
291         if (descriptor == null) {
292           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
293           // go to next element in iterator
294           continue;
295         }
296
297         if (descriptor.hasCrossEntityReferences()) {
298
299           NetworkTransaction txn = new NetworkTransaction();
300           txn.setDescriptor(descriptor);
301           txn.setLink(linkDescriptor.getSelfLink() + linkDescriptor.getDepthModifier());
302           txn.setOperationType(HttpMethod.GET);
303           txn.setEntityType(linkDescriptor.getEntityType());
304
305           aaiWorkOnHand.incrementAndGet();
306
307           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
308               .whenComplete((result, error) -> {
309
310                 aaiWorkOnHand.decrementAndGet();
311
312                 if (error != null) {
313                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
314                 } else {
315                   if (result == null) {
316                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
317                   } else {
318                     updateActiveInventoryCounters(result);
319                     fetchDocumentForUpsert(result);
320                   }
321                 }
322               });
323         }
324       }
325     }
326   }
327
328   /**
329    * Process entity type self links.
330    *
331    * @param operationResult the operation result
332    */
333   private void processEntityTypeSelfLinks(OperationResult operationResult) {
334
335     JsonNode rootNode = null;
336
337     final String jsonResult = operationResult.getResult();
338
339     if (jsonResult != null && jsonResult.length() > 0) {
340
341       try {
342         rootNode = mapper.readTree(jsonResult);
343       } catch (IOException exc) {
344         // TODO // TODO -> LOG, waht should be logged here?
345       }
346
347       JsonNode resultData = rootNode.get("result-data");
348       ArrayNode resultDataArrayNode = null;
349
350       if (resultData.isArray()) {
351         resultDataArrayNode = (ArrayNode) resultData;
352
353         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
354         JsonNode element = null;
355
356         while (elementIterator.hasNext()) {
357           element = elementIterator.next();
358
359           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
360           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
361
362           OxmEntityDescriptor descriptor = null;
363
364           if (resourceType != null && resourceLink != null) {
365             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
366
367             if (descriptor == null) {
368               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
369               // go to next element in iterator
370               continue;
371             }
372             if (descriptor.hasCrossEntityReferences()) {
373               selflinks.add(new SelfLinkDescriptor(
374                   resourceLink,SynchronizerConfiguration.DEPTH_ALL_MODIFIER, resourceType));
375             }
376           }
377         }
378       }
379     }
380   }
381
382   
383   
384   /**
385    * By providing the entity type and a json node for the entity, determine the
386    * primary key name(s) + primary key value(s) sufficient to build an entity query string
387    * of the following format:
388    * 
389    *      <entityType>.<primaryKeyNames>:<primaryKeyValues>
390    * 
391    * @return - a composite string in the above format or null
392    */
393   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
394     
395     OxmEntityDescriptor entityDescriptor =
396         oxmModelLoader.getEntityDescriptor(entityType);
397     
398     String queryString = null;
399     
400     if ( entityDescriptor != null ) {
401
402       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeName();
403       final List<String> keyValues = new ArrayList<String>();
404       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
405
406       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames,"/") + ":" + NodeUtils.concatArray(keyValues);
407
408     } 
409     
410     return queryString;
411
412     
413   }
414   
415   /**
416    * Fetch document for upsert.
417    *
418    * @param txn the txn
419    */
420   private void fetchDocumentForUpsert(NetworkTransaction txn) {
421     
422     if (!txn.getOperationResult().wasSuccessful()) {
423       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
424       return;
425     }
426
427     if (txn.getDescriptor().hasCrossEntityReferences()) {
428
429       final String jsonResult = txn.getOperationResult().getResult();
430       
431       if (jsonResult != null && jsonResult.length() > 0) {
432         
433         /**
434          * Here's what we are going to do:
435          * 
436          * <li>Extract primary key name and value from the parent type.
437          * <li>Extract the primary key and value from the nested child instance.
438          * <li>Build a generic query to discover the self-link for the nested-child-instance using
439          * parent and child.
440          * <li>Set the self-link on the child.
441          * <li>Generate the id that will allow the elastic-search upsert to work.
442          * <li>Rinse and repeat.
443          */
444
445         OxmEntityDescriptor parentEntityDescriptor =
446             oxmModelLoader.getEntityDescriptor(txn.getEntityType());
447         
448         if ( parentEntityDescriptor != null ) {
449           
450           CrossEntityReference cerDefinition = parentEntityDescriptor.getCrossEntityReference();
451
452           if (cerDefinition != null) {
453             JsonNode convertedNode = null;
454             try {
455               convertedNode = NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
456               
457               final String parentEntityQueryString = determineEntityQueryString(txn.getEntityType(), convertedNode);
458               
459               List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
460
461               NodeUtils.extractFieldValuesFromObject(convertedNode,
462                   cerDefinition.getReferenceAttributes(),
463                   extractedParentEntityAttributeValues);
464
465               List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
466               NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
467                   nestedTargetEntityInstances);
468
469               for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
470
471                 OxmEntityDescriptor cerDescriptor =
472                     oxmModelLoader.getSearchableEntityDescriptor(cerDefinition.getTargetEntityType());
473
474                 if (cerDescriptor != null) {
475                   
476                   String childEntityType = cerDefinition.getTargetEntityType();
477                   
478                   List<String> childPrimaryKeyNames = cerDescriptor.getPrimaryKeyAttributeName();
479                   
480                   List<String> childKeyValues = new ArrayList<String>();
481                   NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames, childKeyValues);
482                   
483                   String childEntityQueryKeyString = childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames,"/") + ":" + NodeUtils.concatArray(childKeyValues);
484                   
485                   /**
486                    * Build generic-query to query child instance self-link from AAI
487                    */
488                   List<String> orderedQueryKeyParams = new ArrayList<String>();
489                   if (SERVICE_INSTANCE.equals(childEntityType)) {
490                       orderedQueryKeyParams.clear();
491                       orderedQueryKeyParams.add(childEntityQueryKeyString);
492                     } else {
493                       orderedQueryKeyParams.add(parentEntityQueryString);
494                       orderedQueryKeyParams.add(childEntityQueryKeyString);
495                     }
496                   String genericQueryStr = null;
497                   try {
498                     genericQueryStr = aaiDataProvider.getGenericQueryForSelfLink(childEntityType, orderedQueryKeyParams);
499                     
500                     if (genericQueryStr != null) {
501                       aaiWorkOnHand.incrementAndGet();
502                       OperationResult aaiQueryResult = aaiDataProvider.queryActiveInventoryWithRetries(
503                           genericQueryStr, "application/json",
504                           aaiConfig.getAaiRestConfig().getNumRequestRetries());
505                       aaiWorkOnHand.decrementAndGet();
506                       if (aaiQueryResult!= null && aaiQueryResult.wasSuccessful()) {
507                         
508                         Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
509                         JsonNode genericQueryResult = null;
510                         try {
511                           genericQueryResult = NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
512                           
513                           if ( genericQueryResult != null ) {
514                             
515                             NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link", entityLinks);
516
517                             String selfLink = null;
518
519                             if (entityLinks.size() != 1) {
520                               /**
521                                * an ambiguity exists where we can't reliably determine the self
522                                * link, this should be a permanent error
523                                */
524                               LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY, String.valueOf(entityLinks.size()));
525                             } else {
526                               selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
527                               
528                               if (!cerDescriptor.getSearchableAttributes().isEmpty()) {
529
530                                 IndexableCrossEntityReference icer =
531                                     getPopulatedDocument(targetEntityInstance, cerDescriptor);
532
533                                 for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
534                                   icer.addCrossEntityReferenceValue(
535                                       parentCrossEntityReferenceAttributeValue);
536                                 }
537                                 
538                                 icer.setLink(ActiveInventoryConfig.extractResourcePath(selfLink));
539
540                                 icer.deriveFields();
541
542                                 String link = null;
543                                 try {
544                                   link = getElasticFullUrl("/" + icer.getId(), getIndexName());
545                                 } catch (Exception exc) {
546                                   LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
547                                 }
548
549                                 if (link != null) {
550                                   NetworkTransaction n2 = new NetworkTransaction();
551                                   n2.setLink(link);
552                                   n2.setEntityType(txn.getEntityType());
553                                   n2.setDescriptor(txn.getDescriptor());
554                                   n2.setOperationType(HttpMethod.GET);
555
556                                   esWorkOnHand.incrementAndGet();
557
558                                   supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider),
559                                       esExecutor).whenComplete((result, error) -> {
560
561                                         esWorkOnHand.decrementAndGet();
562
563                                         if (error != null) {
564                                           LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
565                                         } else {
566                                           updateElasticSearchCounters(result);
567                                           performDocumentUpsert(result, icer);
568                                         }
569                                       });
570                                 }
571                               }
572                             }
573                           } else {
574                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
575                           }
576
577                         } catch (Exception exc) {
578                           LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(), exc.getLocalizedMessage());
579                         }
580                         
581                       } else {
582                         String message = "Entity sync failed because AAI query failed with error " + aaiQueryResult.getResult(); 
583                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
584                       }
585                       
586                     } else {
587                       String message = "Entity Sync failed because generic query str could not be determined.";
588                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
589                     }
590                   } catch (Exception exc) {
591                     String message = "Failed to sync entity because generation of generic query failed with error = " + exc.getMessage();
592                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
593                   }
594                   
595                 }
596               }
597               
598             } catch (IOException ioe) {
599               LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
600             }
601           }
602           
603         } else {
604           LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
605         }
606       }
607     }
608   }
609
610   /**
611    * Perform document upsert.
612    *
613    * @param esGetResult the es get result
614    * @param icer the icer
615    */
616   protected void performDocumentUpsert(NetworkTransaction esGetResult,
617       IndexableCrossEntityReference icer) {
618     /**
619      * <p>
620      * <ul>
621      * As part of the response processing we need to do the following:
622      * <li>1. Extract the version (if present), it will be the ETAG when we use the
623      * Search-Abstraction-Service
624      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
625      * tag
626      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
627      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
628      * </ul>
629      * </p>
630      */
631     String link = null;
632     try {
633       link = getElasticFullUrl("/" + icer.getId(), getIndexName());
634     } catch (Exception exc) {
635       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
636       return;
637     }
638
639     boolean wasEntryDiscovered = false;
640     String versionNumber = null;
641     if (esGetResult.getOperationResult().getResultCode() == 404) {
642       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
643     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
644       wasEntryDiscovered = true;
645       try {
646         versionNumber = NodeUtils.extractFieldValueFromObject(
647             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
648             "_version");
649       } catch (IOException exc) {
650         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
651             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
652         return;
653       }
654     } else {
655       /*
656        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
657        * return.
658        */
659       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
660           String.valueOf(esGetResult.getOperationResult().getResultCode()));
661       return;
662     }
663
664     try {
665       String jsonPayload = null;
666       if (wasEntryDiscovered) {
667         try {
668           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
669           NodeUtils.extractObjectsByKey(
670               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
671               "_source", sourceObject);
672
673           if (!sourceObject.isEmpty()) {
674             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
675             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
676             ObjectReader updater = mapper.readerForUpdating(me);
677             MergableEntity merged = updater.readValue(icer.getIndexDocumentJson());
678             jsonPayload = mapper.writeValueAsString(merged);
679           }
680         } catch (IOException exc) {
681           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
682               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
683           return;
684         }
685       } else {
686         jsonPayload = icer.getIndexDocumentJson();
687       }
688
689       if (wasEntryDiscovered) {
690         if (versionNumber != null && jsonPayload != null) {
691
692           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
693               ElasticSearchConfig.getConfig().getType(), icer.getId(), versionNumber, jsonPayload);
694
695           NetworkTransaction transactionTracker = new NetworkTransaction();
696           transactionTracker.setEntityType(esGetResult.getEntityType());
697           transactionTracker.setDescriptor(esGetResult.getDescriptor());
698           transactionTracker.setOperationType(HttpMethod.PUT);
699
700           esWorkOnHand.incrementAndGet();
701           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
702               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
703                   .whenComplete((result, error) -> {
704
705                     esWorkOnHand.decrementAndGet();
706
707                     if (error != null) {
708                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
709                     } else {
710                       updateElasticSearchCounters(result);
711                       processStoreDocumentResult(result, esGetResult, icer);
712                     }
713                   });
714         }
715
716       } else {
717         if (link != null && jsonPayload != null) {
718
719           NetworkTransaction updateElasticTxn = new NetworkTransaction();
720           updateElasticTxn.setLink(link);
721           updateElasticTxn.setEntityType(esGetResult.getEntityType());
722           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
723           updateElasticTxn.setOperationType(HttpMethod.PUT);
724
725           esWorkOnHand.incrementAndGet();
726           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
727               esPutExecutor).whenComplete((result, error) -> {
728
729                 esWorkOnHand.decrementAndGet();
730
731                 if (error != null) {
732                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
733                 } else {
734                   updateElasticSearchCounters(result);
735                   processStoreDocumentResult(result, esGetResult, icer);
736                 }
737               });
738         }
739       }
740     } catch (Exception exc) {
741       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
742     }
743   }
744
745   /**
746    * Process store document result.
747    *
748    * @param esPutResult the es put result
749    * @param esGetResult the es get result
750    * @param icer the icer
751    */
752   private void processStoreDocumentResult(NetworkTransaction esPutResult,
753       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
754
755     OperationResult or = esPutResult.getOperationResult();
756
757     if (!or.wasSuccessful()) {
758       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
759
760         if (shouldAllowRetry(icer.getId())) {
761
762           esWorkOnHand.incrementAndGet();
763
764           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
765           retryQueue.push(rsc);
766
767           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
768         }
769       } else {
770         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
771             or.getResult());
772       }
773     }
774   }
775
776   /**
777    * Perform retry sync.
778    */
779   private void performRetrySync() {
780     while (retryQueue.peek() != null) {
781
782       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
783       if (rsc != null) {
784
785         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
786         NetworkTransaction txn = rsc.getNetworkTransaction();
787
788         String link = null;
789         try {
790           // In this retry flow the icer object has already
791           // derived its fields
792           link = getElasticFullUrl("/" + icer.getId(), getIndexName());
793         } catch (Exception exc) {
794           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
795         }
796
797         if (link != null) {
798           NetworkTransaction retryTransaction = new NetworkTransaction();
799           retryTransaction.setLink(link);
800           retryTransaction.setEntityType(txn.getEntityType());
801           retryTransaction.setDescriptor(txn.getDescriptor());
802           retryTransaction.setOperationType(HttpMethod.GET);
803
804           /*
805            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
806            * that for this request already when queuing the failed PUT!
807            */
808
809           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
810               esExecutor).whenComplete((result, error) -> {
811
812                 esWorkOnHand.decrementAndGet();
813
814                 if (error != null) {
815                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
816                 } else {
817                   updateElasticSearchCounters(result);
818                   performDocumentUpsert(result, icer);
819                 }
820               });
821         }
822
823       }
824     }
825   }
826
827   /**
828    * Should allow retry.
829    *
830    * @param id the id
831    * @return true, if successful
832    */
833   private boolean shouldAllowRetry(String id) {
834     boolean isRetryAllowed = true;
835     if (retryLimitTracker.get(id) != null) {
836       Integer currentCount = retryLimitTracker.get(id);
837       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
838         isRetryAllowed = false;
839         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
840       } else {
841         Integer newCount = new Integer(currentCount.intValue() + 1);
842         retryLimitTracker.put(id, newCount);
843       }
844
845     } else {
846       Integer firstRetryCount = new Integer(1);
847       retryLimitTracker.put(id, firstRetryCount);
848     }
849
850     return isRetryAllowed;
851   }
852
853   /**
854    * Gets the populated document.
855    *
856    * @param entityNode the entity node
857    * @param resultDescriptor the result descriptor
858    * @return the populated document
859    * @throws JsonProcessingException the json processing exception
860    * @throws IOException Signals that an I/O exception has occurred.
861    */
862   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
863       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
864
865     IndexableCrossEntityReference icer = new IndexableCrossEntityReference(oxmModelLoader);
866
867     icer.setEntityType(resultDescriptor.getEntityName());
868
869     List<String> primaryKeyValues = new ArrayList<String>();
870     String pkeyValue = null;
871
872     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
873       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
874       if (pkeyValue != null) {
875         primaryKeyValues.add(pkeyValue);
876       } else {
877         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
878       }
879     }
880
881     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
882     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
883
884     return icer;
885
886   }
887 }