Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / synchronizer / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.synchronizer;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.net.InetAddress;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Deque;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentLinkedDeque;
38 import java.util.concurrent.ExecutorService;
39 import java.util.function.Supplier;
40
41 import org.onap.aai.sparky.config.oxm.CrossEntityReference;
42 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
43 import org.onap.aai.sparky.dal.NetworkTransaction;
44 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
45 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.dal.rest.OperationResult;
48 import org.onap.aai.sparky.logging.AaiUiMsgs;
49 import org.onap.aai.sparky.synchronizer.config.SynchronizerConfiguration;
50 import org.onap.aai.sparky.synchronizer.entity.IndexableCrossEntityReference;
51 import org.onap.aai.sparky.synchronizer.entity.MergableEntity;
52 import org.onap.aai.sparky.synchronizer.entity.SelfLinkDescriptor;
53 import org.onap.aai.sparky.synchronizer.enumeration.OperationState;
54 import org.onap.aai.sparky.synchronizer.enumeration.SynchronizerState;
55 import org.onap.aai.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
56 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchPut;
57 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchRetrieval;
58 import org.onap.aai.sparky.synchronizer.task.PerformElasticSearchUpdate;
59 import org.onap.aai.sparky.util.NodeUtils;
60 import org.onap.aai.cl.api.Logger;
61 import org.onap.aai.cl.eelf.LoggerFactory;
62 import org.slf4j.MDC;
63
64 import org.onap.aai.cl.mdc.MdcContext;
65 import com.fasterxml.jackson.core.JsonProcessingException;
66 import com.fasterxml.jackson.databind.JsonNode;
67 import com.fasterxml.jackson.databind.ObjectReader;
68 import com.fasterxml.jackson.databind.node.ArrayNode;
69
70 /**
71  * The Class CrossEntityReferenceSynchronizer.
72  */
73 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
74     implements IndexSynchronizer {
75
76   /**
77    * The Class RetryCrossEntitySyncContainer.
78    */
79   private class RetryCrossEntitySyncContainer {
80     NetworkTransaction txn;
81     IndexableCrossEntityReference icer;
82
83     /**
84      * Instantiates a new retry cross entity sync container.
85      *
86      * @param txn the txn
87      * @param icer the icer
88      */
89     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
90         IndexableCrossEntityReference icer) {
91       this.txn = txn;
92       this.icer = icer;
93     }
94
95     public NetworkTransaction getNetworkTransaction() {
96       return txn;
97     }
98
99     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
100       return icer;
101     }
102   }
103
104   private static final Logger LOG =
105       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
106
107   private static final String SERVICE_INSTANCE = "service-instance";
108   private Deque<SelfLinkDescriptor> selflinks;
109   private Deque<RetryCrossEntitySyncContainer> retryQueue;
110   private Map<String, Integer> retryLimitTracker;
111   private boolean isAllWorkEnumerated;
112   protected ExecutorService esPutExecutor;
113   protected ActiveInventoryConfig aaiConfig;
114
115   /**
116    * Instantiates a new cross entity reference synchronizer.
117    *
118    * @param indexName the index name
119    * @throws Exception the exception
120    */
121   public CrossEntityReferenceSynchronizer(String indexName, ActiveInventoryConfig aaiConfig)
122       throws Exception {
123     super(LOG, "CERS", 2, 5, 5, indexName);
124     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
125     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
126     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
127     this.synchronizerName = "Cross Reference Entity Synchronizer";
128     this.isAllWorkEnumerated = false;
129     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
130     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
131         oxmModelLoader.getCrossReferenceEntityDescriptors());
132     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
133         oxmModelLoader.getCrossReferenceEntityDescriptors());
134     this.aaiConfig = aaiConfig;
135     this.syncDurationInMs = -1;
136   }
137
138   /*
139    * (non-Javadoc)
140    * 
141    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#doSync()
142    */
143   @Override
144   public OperationState doSync() {
145     this.syncDurationInMs = -1;
146     String txnID = NodeUtils.getRandomTxnId();
147     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
148
149     resetCounters();
150     syncStartedTimeStampInMs = System.currentTimeMillis();
151     launchSyncFlow();
152     return OperationState.OK;
153   }
154
155   @Override
156   public SynchronizerState getState() {
157     if (!isSyncDone()) {
158       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
159     }
160
161     return SynchronizerState.IDLE;
162   }
163
164   /*
165    * (non-Javadoc)
166    * 
167    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
168    */
169   @Override
170   public String getStatReport(boolean showFinalReport) {
171     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
172     return getStatReport(syncDurationInMs, showFinalReport);
173   }
174
175   /*
176    * (non-Javadoc)
177    * 
178    * @see org.onap.aai.sparky.synchronizer.IndexSynchronizer#shutdown()
179    */
180   @Override
181   public void shutdown() {
182     this.shutdownExecutors();
183   }
184
185   @Override
186   protected boolean isSyncDone() {
187     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
188
189     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
190       return false;
191     }
192
193     return true;
194   }
195
196   /**
197    * Launch sync flow.
198    *
199    * @return the operation state
200    */
201   private OperationState launchSyncFlow() {
202     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
203     Map<String, OxmEntityDescriptor> descriptorMap =
204         oxmModelLoader.getCrossReferenceEntityDescriptors();
205
206     if (descriptorMap.isEmpty()) {
207       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
208
209       return OperationState.ERROR;
210     }
211
212     Collection<String> syncTypes = descriptorMap.keySet();
213
214     try {
215
216       /*
217        * launch a parallel async thread to process the documents for each entity-type (to max the of
218        * the configured executor anyway)
219        */
220
221       aaiWorkOnHand.set(syncTypes.size());
222
223       for (String key : syncTypes) {
224
225         supplyAsync(new Supplier<Void>() {
226
227           @Override
228           public Void get() {
229             MDC.setContextMap(contextMap);
230             OperationResult typeLinksResult = null;
231             try {
232               typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(key);
233               aaiWorkOnHand.decrementAndGet();
234               processEntityTypeSelfLinks(typeLinksResult);
235             } catch (Exception exc) {
236               // TODO -> LOG, what should be logged here?
237             }
238
239             return null;
240           }
241
242         }, aaiExecutor).whenComplete((result, error) -> {
243           if (error != null) {
244             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
245           }
246         });
247       }
248
249       while (aaiWorkOnHand.get() != 0) {
250
251         if (LOG.isDebugEnabled()) {
252           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
253         }
254
255         Thread.sleep(1000);
256       }
257
258       aaiWorkOnHand.set(selflinks.size());
259       isAllWorkEnumerated = true;
260       performSync();
261
262       while (!isSyncDone()) {
263         performRetrySync();
264         Thread.sleep(1000);
265       }
266
267       /*
268        * Make sure we don't hang on to retries that failed which could cause issues during future
269        * syncs
270        */
271       retryLimitTracker.clear();
272
273     } catch (Exception exc) {
274       // TODO -> LOG, waht should be logged here?
275     }
276
277     return OperationState.OK;
278   }
279
280   /**
281    * Perform sync.
282    */
283   private void performSync() {
284     while (selflinks.peek() != null) {
285
286       SelfLinkDescriptor linkDescriptor = selflinks.poll();
287       aaiWorkOnHand.decrementAndGet();
288
289       OxmEntityDescriptor descriptor = null;
290
291       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
292
293         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
294
295         if (descriptor == null) {
296           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
297           // go to next element in iterator
298           continue;
299         }
300
301         if (descriptor.hasCrossEntityReferences()) {
302
303           NetworkTransaction txn = new NetworkTransaction();
304           txn.setDescriptor(descriptor);
305           txn.setLink(linkDescriptor.getSelfLink() + linkDescriptor.getDepthModifier());
306           txn.setOperationType(HttpMethod.GET);
307           txn.setEntityType(linkDescriptor.getEntityType());
308
309           aaiWorkOnHand.incrementAndGet();
310
311           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
312               .whenComplete((result, error) -> {
313
314                 aaiWorkOnHand.decrementAndGet();
315
316                 if (error != null) {
317                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
318                 } else {
319                   if (result == null) {
320                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
321                   } else {
322                     updateActiveInventoryCounters(result);
323                     fetchDocumentForUpsert(result);
324                   }
325                 }
326               });
327         }
328       }
329     }
330   }
331
332   /**
333    * Process entity type self links.
334    *
335    * @param operationResult the operation result
336    */
337   private void processEntityTypeSelfLinks(OperationResult operationResult) {
338
339     JsonNode rootNode = null;
340
341     final String jsonResult = operationResult.getResult();
342
343     if (jsonResult != null && jsonResult.length() > 0) {
344
345       try {
346         rootNode = mapper.readTree(jsonResult);
347       } catch (IOException exc) {
348         String message = "Could not deserialize JSON (representing operation result) as node tree. "
349             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
350         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
351         return;
352       }
353
354       JsonNode resultData = rootNode.get("result-data");
355       ArrayNode resultDataArrayNode = null;
356
357       if (resultData.isArray()) {
358         resultDataArrayNode = (ArrayNode) resultData;
359
360         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
361         JsonNode element = null;
362
363         while (elementIterator.hasNext()) {
364           element = elementIterator.next();
365
366           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
367           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
368
369           OxmEntityDescriptor descriptor = null;
370
371           if (resourceType != null && resourceLink != null) {
372             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
373
374             if (descriptor == null) {
375               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
376               // go to next element in iterator
377               continue;
378             }
379             if (descriptor.hasCrossEntityReferences()) {
380               selflinks.add(new SelfLinkDescriptor(resourceLink,
381                   SynchronizerConfiguration.DEPTH_ALL_MODIFIER, resourceType));
382             }
383           }
384         }
385       }
386     }
387   }
388
389
390
391   /**
392    * By providing the entity type and a json node for the entity, determine the primary key name(s)
393    * + primary key value(s) sufficient to build an entity query string of the following format:
394    * 
395    * <entityType>.<primaryKeyNames>:<primaryKeyValues>
396    * 
397    * @return - a composite string in the above format or null
398    */
399   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
400
401     OxmEntityDescriptor entityDescriptor = oxmModelLoader.getEntityDescriptor(entityType);
402
403     String queryString = null;
404
405     if (entityDescriptor != null) {
406
407       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeName();
408       final List<String> keyValues = new ArrayList<String>();
409       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
410
411       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames, "/") + ":"
412           + NodeUtils.concatArray(keyValues);
413
414     }
415
416     return queryString;
417
418
419   }
420
421   /**
422    * Fetch document for upsert.
423    *
424    * @param txn the txn
425    */
426   private void fetchDocumentForUpsert(NetworkTransaction txn) {
427
428     if (!txn.getOperationResult().wasSuccessful()) {
429       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
430       return;
431     }
432
433     if (txn.getDescriptor().hasCrossEntityReferences()) {
434
435       final String jsonResult = txn.getOperationResult().getResult();
436
437       if (jsonResult != null && jsonResult.length() > 0) {
438
439         /**
440          * Here's what we are going to do:
441          * 
442          * <li>Extract primary key name and value from the parent type.
443          * <li>Extract the primary key and value from the nested child instance.
444          * <li>Build a generic query to discover the self-link for the nested-child-instance using
445          * parent and child.
446          * <li>Set the self-link on the child.
447          * <li>Generate the id that will allow the elastic-search upsert to work.
448          * <li>Rinse and repeat.
449          */
450
451         OxmEntityDescriptor parentEntityDescriptor =
452             oxmModelLoader.getEntityDescriptor(txn.getEntityType());
453
454         if (parentEntityDescriptor != null) {
455
456           CrossEntityReference cerDefinition = parentEntityDescriptor.getCrossEntityReference();
457
458           if (cerDefinition != null) {
459             JsonNode convertedNode = null;
460             try {
461               convertedNode =
462                   NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
463
464               final String parentEntityQueryString =
465                   determineEntityQueryString(txn.getEntityType(), convertedNode);
466
467               List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
468
469               NodeUtils.extractFieldValuesFromObject(convertedNode,
470                   cerDefinition.getReferenceAttributes(), extractedParentEntityAttributeValues);
471
472               List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
473               NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
474                   nestedTargetEntityInstances);
475
476               for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
477
478                 OxmEntityDescriptor cerDescriptor = oxmModelLoader
479                     .getSearchableEntityDescriptor(cerDefinition.getTargetEntityType());
480
481                 if (cerDescriptor != null) {
482
483                   String childEntityType = cerDefinition.getTargetEntityType();
484
485                   List<String> childPrimaryKeyNames = cerDescriptor.getPrimaryKeyAttributeName();
486
487                   List<String> childKeyValues = new ArrayList<String>();
488                   NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames,
489                       childKeyValues);
490
491                   String childEntityQueryKeyString =
492                       childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames, "/") + ":"
493                           + NodeUtils.concatArray(childKeyValues);
494
495                   /**
496                    * Build generic-query to query child instance self-link from AAI
497                    */
498                   List<String> orderedQueryKeyParams = new ArrayList<String>();
499                   if (SERVICE_INSTANCE.equals(childEntityType)) {
500                     orderedQueryKeyParams.clear();
501                     orderedQueryKeyParams.add(childEntityQueryKeyString);
502                   } else {
503                     orderedQueryKeyParams.add(parentEntityQueryString);
504                     orderedQueryKeyParams.add(childEntityQueryKeyString);
505                   }
506                   String genericQueryStr = null;
507                   try {
508                     genericQueryStr = aaiDataProvider.getGenericQueryForSelfLink(childEntityType,
509                         orderedQueryKeyParams);
510
511                     if (genericQueryStr != null) {
512                       aaiWorkOnHand.incrementAndGet();
513                       OperationResult aaiQueryResult = aaiDataProvider
514                           .queryActiveInventoryWithRetries(genericQueryStr, "application/json",
515                               aaiConfig.getAaiRestConfig().getNumRequestRetries());
516                       aaiWorkOnHand.decrementAndGet();
517                       if (aaiQueryResult != null && aaiQueryResult.wasSuccessful()) {
518
519                         Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
520                         JsonNode genericQueryResult = null;
521                         try {
522                           genericQueryResult =
523                               NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
524
525                           if (genericQueryResult != null) {
526
527                             NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link",
528                                 entityLinks);
529
530                             String selfLink = null;
531
532                             if (entityLinks.size() != 1) {
533                               /**
534                                * an ambiguity exists where we can't reliably determine the self
535                                * link, this should be a permanent error
536                                */
537                               LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY,
538                                   String.valueOf(entityLinks.size()));
539                             } else {
540                               selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
541
542                               if (!cerDescriptor.getSearchableAttributes().isEmpty()) {
543
544                                 IndexableCrossEntityReference icer =
545                                     getPopulatedDocument(targetEntityInstance, cerDescriptor);
546
547                                 for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
548                                   icer.addCrossEntityReferenceValue(
549                                       parentCrossEntityReferenceAttributeValue);
550                                 }
551
552                                 icer.setLink(ActiveInventoryConfig.extractResourcePath(selfLink));
553
554                                 icer.deriveFields();
555
556                                 String link = null;
557                                 try {
558                                   link = getElasticFullUrl("/" + icer.getId(), getIndexName());
559                                 } catch (Exception exc) {
560                                   LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
561                                       exc.getLocalizedMessage());
562                                 }
563
564                                 if (link != null) {
565                                   NetworkTransaction n2 = new NetworkTransaction();
566                                   n2.setLink(link);
567                                   n2.setEntityType(txn.getEntityType());
568                                   n2.setDescriptor(txn.getDescriptor());
569                                   n2.setOperationType(HttpMethod.GET);
570
571                                   esWorkOnHand.incrementAndGet();
572
573                                   supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider),
574                                       esExecutor).whenComplete((result, error) -> {
575
576                                         esWorkOnHand.decrementAndGet();
577
578                                         if (error != null) {
579                                           LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED,
580                                               error.getLocalizedMessage());
581                                         } else {
582                                           updateElasticSearchCounters(result);
583                                           performDocumentUpsert(result, icer);
584                                         }
585                                       });
586                                 }
587                               }
588                             }
589                           } else {
590                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
591                           }
592
593                         } catch (Exception exc) {
594                           LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(),
595                               exc.getLocalizedMessage());
596                         }
597
598                       } else {
599                         String message = "Entity sync failed because AAI query failed with error ";
600                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
601                       }
602
603                     } else {
604                       String message =
605                           "Entity Sync failed because generic query str could not be determined.";
606                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
607                     }
608                   } catch (Exception exc) {
609                     String message =
610                         "Failed to sync entity because generation of generic query failed with error = "
611                             + exc.getMessage();
612                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
613                   }
614
615                 }
616               }
617
618             } catch (IOException ioe) {
619               LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
620             }
621           }
622
623         } else {
624           LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
625         }
626       }
627     }
628   }
629
630   /**
631    * Perform document upsert.
632    *
633    * @param esGetResult the es get result
634    * @param icer the icer
635    */
636   protected void performDocumentUpsert(NetworkTransaction esGetResult,
637       IndexableCrossEntityReference icer) {
638     /**
639      * <p>
640      * <ul>
641      * As part of the response processing we need to do the following:
642      * <li>1. Extract the version (if present), it will be the ETAG when we use the
643      * Search-Abstraction-Service
644      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
645      * tag
646      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
647      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
648      * </ul>
649      * </p>
650      */
651     String link = null;
652     try {
653       link = getElasticFullUrl("/" + icer.getId(), getIndexName());
654     } catch (Exception exc) {
655       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
656       return;
657     }
658
659     boolean wasEntryDiscovered = false;
660     String versionNumber = null;
661     if (esGetResult.getOperationResult().getResultCode() == 404) {
662       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
663     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
664       wasEntryDiscovered = true;
665       try {
666         versionNumber = NodeUtils.extractFieldValueFromObject(
667             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
668             "_version");
669       } catch (IOException exc) {
670         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
671             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
672         return;
673       }
674     } else {
675       /*
676        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
677        * return.
678        */
679       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
680           String.valueOf(esGetResult.getOperationResult().getResultCode()));
681       return;
682     }
683
684     try {
685       String jsonPayload = null;
686       if (wasEntryDiscovered) {
687         try {
688           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
689           NodeUtils.extractObjectsByKey(
690               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
691               "_source", sourceObject);
692
693           if (!sourceObject.isEmpty()) {
694             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
695             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
696             ObjectReader updater = mapper.readerForUpdating(me);
697             MergableEntity merged = updater.readValue(icer.getIndexDocumentJson());
698             jsonPayload = mapper.writeValueAsString(merged);
699           }
700         } catch (IOException exc) {
701           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
702               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
703           return;
704         }
705       } else {
706         jsonPayload = icer.getIndexDocumentJson();
707       }
708
709       if (wasEntryDiscovered) {
710         if (versionNumber != null && jsonPayload != null) {
711
712           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
713               ElasticSearchConfig.getConfig().getType(), icer.getId(), versionNumber, jsonPayload);
714
715           NetworkTransaction transactionTracker = new NetworkTransaction();
716           transactionTracker.setEntityType(esGetResult.getEntityType());
717           transactionTracker.setDescriptor(esGetResult.getDescriptor());
718           transactionTracker.setOperationType(HttpMethod.PUT);
719
720           esWorkOnHand.incrementAndGet();
721           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
722               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
723                   .whenComplete((result, error) -> {
724
725                     esWorkOnHand.decrementAndGet();
726
727                     if (error != null) {
728                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
729                     } else {
730                       updateElasticSearchCounters(result);
731                       processStoreDocumentResult(result, esGetResult, icer);
732                     }
733                   });
734         }
735
736       } else {
737         if (link != null && jsonPayload != null) {
738
739           NetworkTransaction updateElasticTxn = new NetworkTransaction();
740           updateElasticTxn.setLink(link);
741           updateElasticTxn.setEntityType(esGetResult.getEntityType());
742           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
743           updateElasticTxn.setOperationType(HttpMethod.PUT);
744
745           esWorkOnHand.incrementAndGet();
746           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
747               esPutExecutor).whenComplete((result, error) -> {
748
749                 esWorkOnHand.decrementAndGet();
750
751                 if (error != null) {
752                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
753                 } else {
754                   updateElasticSearchCounters(result);
755                   processStoreDocumentResult(result, esGetResult, icer);
756                 }
757               });
758         }
759       }
760     } catch (Exception exc) {
761       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
762     }
763   }
764
765   /**
766    * Process store document result.
767    *
768    * @param esPutResult the es put result
769    * @param esGetResult the es get result
770    * @param icer the icer
771    */
772   private void processStoreDocumentResult(NetworkTransaction esPutResult,
773       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
774
775     OperationResult or = esPutResult.getOperationResult();
776
777     if (!or.wasSuccessful()) {
778       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
779
780         if (shouldAllowRetry(icer.getId())) {
781
782           esWorkOnHand.incrementAndGet();
783
784           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
785           retryQueue.push(rsc);
786
787           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
788         }
789       } else {
790         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
791             or.getResult());
792       }
793     }
794   }
795
796   /**
797    * Perform retry sync.
798    */
799   private void performRetrySync() {
800     while (retryQueue.peek() != null) {
801
802       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
803       if (rsc != null) {
804
805         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
806         NetworkTransaction txn = rsc.getNetworkTransaction();
807
808         String link = null;
809         try {
810           // In this retry flow the icer object has already
811           // derived its fields
812           link = getElasticFullUrl("/" + icer.getId(), getIndexName());
813         } catch (Exception exc) {
814           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
815         }
816
817         if (link != null) {
818           NetworkTransaction retryTransaction = new NetworkTransaction();
819           retryTransaction.setLink(link);
820           retryTransaction.setEntityType(txn.getEntityType());
821           retryTransaction.setDescriptor(txn.getDescriptor());
822           retryTransaction.setOperationType(HttpMethod.GET);
823
824           /*
825            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
826            * that for this request already when queuing the failed PUT!
827            */
828
829           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
830               esExecutor).whenComplete((result, error) -> {
831
832                 esWorkOnHand.decrementAndGet();
833
834                 if (error != null) {
835                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
836                 } else {
837                   updateElasticSearchCounters(result);
838                   performDocumentUpsert(result, icer);
839                 }
840               });
841         }
842
843       }
844     }
845   }
846
847   /**
848    * Should allow retry.
849    *
850    * @param id the id
851    * @return true, if successful
852    */
853   private boolean shouldAllowRetry(String id) {
854     boolean isRetryAllowed = true;
855     if (retryLimitTracker.get(id) != null) {
856       Integer currentCount = retryLimitTracker.get(id);
857       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
858         isRetryAllowed = false;
859         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
860       } else {
861         Integer newCount = new Integer(currentCount.intValue() + 1);
862         retryLimitTracker.put(id, newCount);
863       }
864
865     } else {
866       Integer firstRetryCount = new Integer(1);
867       retryLimitTracker.put(id, firstRetryCount);
868     }
869
870     return isRetryAllowed;
871   }
872
873   /**
874    * Gets the populated document.
875    *
876    * @param entityNode the entity node
877    * @param resultDescriptor the result descriptor
878    * @return the populated document
879    * @throws JsonProcessingException the json processing exception
880    * @throws IOException Signals that an I/O exception has occurred.
881    */
882   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
883       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
884
885     IndexableCrossEntityReference icer = new IndexableCrossEntityReference(oxmModelLoader);
886
887     icer.setEntityType(resultDescriptor.getEntityName());
888
889     List<String> primaryKeyValues = new ArrayList<String>();
890     String pkeyValue = null;
891
892     for (String keyName : resultDescriptor.getPrimaryKeyAttributeName()) {
893       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
894       if (pkeyValue != null) {
895         primaryKeyValues.add(pkeyValue);
896       } else {
897         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
898       }
899     }
900
901     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
902     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
903
904     return icer;
905
906   }
907 }