Adding UI extensibility
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / crossentityreference / sync / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.crossentityreference.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.ExecutorService;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.CrossEntityReference;
44 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceDescriptor;
45 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup;
46 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
47 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
48 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
50 import org.onap.aai.sparky.dal.NetworkTransaction;
51 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
52 import org.onap.aai.sparky.dal.elasticsearch.config.ElasticSearchConfig;
53 import org.onap.aai.sparky.dal.rest.HttpMethod;
54 import org.onap.aai.sparky.logging.AaiUiMsgs;
55 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
56 import org.onap.aai.sparky.sync.IndexSynchronizer;
57 import org.onap.aai.sparky.sync.SynchronizerConstants;
58 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
59 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
60 import org.onap.aai.sparky.sync.entity.IndexableCrossEntityReference;
61 import org.onap.aai.sparky.sync.entity.MergableEntity;
62 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
63 import org.onap.aai.sparky.sync.enumeration.OperationState;
64 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
65 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
66 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
67 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
68 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
69 import org.onap.aai.sparky.util.NodeUtils;
70 import org.slf4j.MDC;
71
72 import com.fasterxml.jackson.core.JsonProcessingException;
73 import com.fasterxml.jackson.databind.JsonNode;
74 import com.fasterxml.jackson.databind.ObjectReader;
75 import com.fasterxml.jackson.databind.node.ArrayNode;
76
77 /**
78  * The Class CrossEntityReferenceSynchronizer.
79  */
80 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
81     implements IndexSynchronizer {
82
83   /**
84    * The Class RetryCrossEntitySyncContainer.
85    */
86   private class RetryCrossEntitySyncContainer {
87     NetworkTransaction txn;
88     IndexableCrossEntityReference icer;
89
90     /**
91      * Instantiates a new retry cross entity sync container.
92      *
93      * @param txn the txn
94      * @param icer the icer
95      */
96     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
97         IndexableCrossEntityReference icer) {
98       this.txn = txn;
99       this.icer = icer;
100     }
101
102     public NetworkTransaction getNetworkTransaction() {
103       return txn;
104     }
105
106     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
107       return icer;
108     }
109   }
110
111   private static final Logger LOG =
112       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
113
114   private static final String SERVICE_INSTANCE = "service-instance";
115
116   private Deque<SelfLinkDescriptor> selflinks;
117   private Deque<RetryCrossEntitySyncContainer> retryQueue;
118   private Map<String, Integer> retryLimitTracker;
119   private boolean isAllWorkEnumerated;
120   protected ExecutorService esPutExecutor;
121
122
123   /**
124    * Instantiates a new cross entity reference synchronizer.
125    *
126    * @param indexName the index name
127    * @throws Exception the exception
128    */
129   public CrossEntityReferenceSynchronizer(ElasticSearchSchemaConfig schemaConfig,
130       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
131       NetworkStatisticsConfig esStatConfig) throws Exception {
132     super(LOG, "CERS", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
133         aaiStatConfig, esStatConfig);
134     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
135     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
136     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
137     this.synchronizerName = "Cross Reference Entity Synchronizer";
138     this.isAllWorkEnumerated = false;
139     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
140     this.aaiEntityStats.intializeEntityCounters(
141         CrossEntityReferenceLookup.getInstance().getCrossReferenceEntityDescriptors().keySet());
142
143     this.esEntityStats.intializeEntityCounters(
144         CrossEntityReferenceLookup.getInstance().getCrossReferenceEntityDescriptors().keySet());
145     this.syncDurationInMs = -1;
146   }
147
148   /*
149    * (non-Javadoc)
150    * 
151    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
152    */
153   @Override
154   public OperationState doSync() {
155     this.syncDurationInMs = -1;
156     String txnID = NodeUtils.getRandomTxnId();
157     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
158
159     resetCounters();
160     syncStartedTimeStampInMs = System.currentTimeMillis();
161     launchSyncFlow();
162     return OperationState.OK;
163   }
164
165   @Override
166   public SynchronizerState getState() {
167     if (!isSyncDone()) {
168       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
169     }
170
171     return SynchronizerState.IDLE;
172   }
173
174   /*
175    * (non-Javadoc)
176    * 
177    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
178    */
179   @Override
180   public String getStatReport(boolean showFinalReport) {
181     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
182     return getStatReport(syncDurationInMs, showFinalReport);
183   }
184
185   /*
186    * (non-Javadoc)
187    * 
188    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
189    */
190   @Override
191   public void shutdown() {
192     this.shutdownExecutors();
193   }
194
195   @Override
196   protected boolean isSyncDone() {
197     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
198
199     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
200       return false;
201     }
202
203     return true;
204   }
205
206   /**
207    * Launch sync flow.
208    *
209    * @return the operation state
210    */
211   private OperationState launchSyncFlow() {
212     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
213     Map<String, CrossEntityReferenceDescriptor> descriptorMap =
214         CrossEntityReferenceLookup.getInstance().getCrossReferenceEntityDescriptors();
215
216     if (descriptorMap.isEmpty()) {
217       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
218
219       return OperationState.ERROR;
220     }
221
222     Collection<String> syncTypes = descriptorMap.keySet();
223
224     try {
225
226       /*
227        * launch a parallel async thread to process the documents for each entity-type (to max the of
228        * the configured executor anyway)
229        */
230
231       aaiWorkOnHand.set(syncTypes.size());
232
233       for (String key : syncTypes) {
234
235         supplyAsync(new Supplier<Void>() {
236
237           @Override
238           public Void get() {
239             MDC.setContextMap(contextMap);
240             OperationResult typeLinksResult = null;
241             try {
242               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
243               aaiWorkOnHand.decrementAndGet();
244               processEntityTypeSelfLinks(typeLinksResult);
245             } catch (Exception exc) {
246               // TODO -> LOG, what should be logged here?
247             }
248
249             return null;
250           }
251
252         }, aaiExecutor).whenComplete((result, error) -> {
253           if (error != null) {
254             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
255           }
256         });
257       }
258
259       while (aaiWorkOnHand.get() != 0) {
260
261         if (LOG.isDebugEnabled()) {
262           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
263         }
264
265         Thread.sleep(1000);
266       }
267
268       aaiWorkOnHand.set(selflinks.size());
269       isAllWorkEnumerated = true;
270       performSync();
271
272       while (!isSyncDone()) {
273         performRetrySync();
274         Thread.sleep(1000);
275       }
276
277       /*
278        * Make sure we don't hang on to retries that failed which could cause issues during future
279        * syncs
280        */
281       retryLimitTracker.clear();
282
283     } catch (Exception exc) {
284       // TODO -> LOG, waht should be logged here?
285     }
286
287     return OperationState.OK;
288   }
289
290   /**
291    * Perform sync.
292    */
293   private void performSync() {
294     while (selflinks.peek() != null) {
295
296       SelfLinkDescriptor linkDescriptor = selflinks.poll();
297       aaiWorkOnHand.decrementAndGet();
298
299       CrossEntityReferenceDescriptor descriptor = null;
300
301       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
302
303         descriptor = CrossEntityReferenceLookup.getInstance().getCrossReferenceEntityDescriptors()
304             .get(linkDescriptor.getEntityType());
305
306         if (descriptor == null) {
307           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
308           // go to next element in iterator
309           continue;
310         }
311
312         if (descriptor.hasCrossEntityReferences()) {
313
314           NetworkTransaction txn = new NetworkTransaction();
315           txn.setDescriptor(descriptor);
316           txn.setLink(linkDescriptor.getSelfLink());
317           txn.setQueryParameters(linkDescriptor.getDepthModifier());
318           txn.setOperationType(HttpMethod.GET);
319           txn.setEntityType(linkDescriptor.getEntityType());
320
321           aaiWorkOnHand.incrementAndGet();
322
323           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
324               .whenComplete((result, error) -> {
325
326                 aaiWorkOnHand.decrementAndGet();
327
328                 if (error != null) {
329                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
330                 } else {
331                   if (result == null) {
332                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
333                   } else {
334                     updateActiveInventoryCounters(result);
335                     fetchDocumentForUpsert(result);
336                   }
337                 }
338               });
339         }
340       }
341     }
342   }
343
344   /**
345    * Process entity type self links.
346    *
347    * @param operationResult the operation result
348    */
349   private void processEntityTypeSelfLinks(OperationResult operationResult) {
350
351     JsonNode rootNode = null;
352
353     final String jsonResult = operationResult.getResult();
354
355     if (jsonResult != null && jsonResult.length() > 0) {
356
357       try {
358         rootNode = mapper.readTree(jsonResult);
359       } catch (IOException exc) {
360         // TODO // TODO -> LOG, waht should be logged here?
361       }
362
363       JsonNode resultData = rootNode.get("result-data");
364       ArrayNode resultDataArrayNode = null;
365
366       CrossEntityReferenceLookup cerLookup = CrossEntityReferenceLookup.getInstance();
367
368       if (resultData.isArray()) {
369         resultDataArrayNode = (ArrayNode) resultData;
370
371         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
372         JsonNode element = null;
373
374         while (elementIterator.hasNext()) {
375           element = elementIterator.next();
376
377           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
378           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
379
380           CrossEntityReferenceDescriptor descriptor = null;
381
382           if (resourceType != null && resourceLink != null) {
383             descriptor = cerLookup.getCrossReferenceEntityDescriptors().get(resourceType);
384
385             if (descriptor == null) {
386               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
387               // go to next element in iterator
388               continue;
389             }
390             if (descriptor.hasCrossEntityReferences()) {
391               selflinks.add(new SelfLinkDescriptor(resourceLink,
392                   SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
393             }
394           }
395         }
396       }
397     }
398   }
399
400
401
402   /**
403    * By providing the entity type and a json node for the entity, determine the primary key name(s)
404    * + primary key value(s) sufficient to build an entity query string of the following format:
405    * 
406    * <entityType>.<primaryKeyNames>:<primaryKeyValues>
407    * 
408    * @return - a composite string in the above format or null
409    */
410   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
411
412     OxmEntityDescriptor entityDescriptor =
413         OxmEntityLookup.getInstance().getEntityDescriptors().get(entityType);
414
415     String queryString = null;
416
417     if (entityDescriptor != null) {
418
419       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeNames();
420       final List<String> keyValues = new ArrayList<String>();
421       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
422
423       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames, "/") + ":"
424           + NodeUtils.concatArray(keyValues);
425
426     }
427
428     return queryString;
429
430
431   }
432
433   /**
434    * Fetch document for upsert.
435    *
436    * @param txn the txn
437    */
438   private void fetchDocumentForUpsert(NetworkTransaction txn) {
439
440     if (!txn.getOperationResult().wasSuccessful()) {
441       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
442       return;
443     }
444
445     CrossEntityReferenceDescriptor cerDescriptor = CrossEntityReferenceLookup.getInstance()
446         .getCrossReferenceEntityDescriptors().get(txn.getDescriptor().getEntityName());
447
448     if (cerDescriptor != null && cerDescriptor.hasCrossEntityReferences()) {
449
450       final String jsonResult = txn.getOperationResult().getResult();
451
452       if (jsonResult != null && jsonResult.length() > 0) {
453
454         /**
455          * Here's what we are going to do:
456          * 
457          * <li>Extract primary key name and value from the parent type.
458          * <li>Extract the primary key and value from the nested child instance.
459          * <li>Build a generic query to discover the self-link for the nested-child-instance using
460          * parent and child.
461          * <li>Set the self-link on the child.
462          * <li>Generate the id that will allow the elastic-search upsert to work.
463          * <li>Rinse and repeat.
464          */
465
466         CrossEntityReference cerDefinition = cerDescriptor.getCrossEntityReference();
467
468         if (cerDefinition != null) {
469           JsonNode convertedNode = null;
470           try {
471             convertedNode =
472                 NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
473
474             final String parentEntityQueryString =
475                 determineEntityQueryString(txn.getEntityType(), convertedNode);
476
477             List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
478
479             NodeUtils.extractFieldValuesFromObject(convertedNode,
480                 cerDefinition.getReferenceAttributes(), extractedParentEntityAttributeValues);
481
482             List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
483             NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
484                 nestedTargetEntityInstances);
485
486             for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
487
488               if (cerDescriptor != null) {
489
490                 String childEntityType = cerDefinition.getTargetEntityType();
491
492                 List<String> childPrimaryKeyNames = cerDescriptor.getPrimaryKeyAttributeNames();
493
494                 List<String> childKeyValues = new ArrayList<String>();
495                 NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames,
496                     childKeyValues);
497
498                 String childEntityQueryKeyString =
499                     childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames, "/") + ":"
500                         + NodeUtils.concatArray(childKeyValues);
501
502                 /**
503                  * Build generic-query to query child instance self-link from AAI
504                  */
505                 List<String> orderedQueryKeyParams = new ArrayList<String>();
506
507                 /**
508                  * At present, there is an issue with resolving the self-link using the
509                  * generic-query with nothing more than the service-instance identifier and the
510                  * service-subscription. There is another level of detail we don't have access to
511                  * unless we parse it out of the service-subscription self-link, which is a coupling
512                  * I would like to avoid. Fortunately, there is a workaround, but only for
513                  * service-instances, which is presently our only use-case for the
514                  * cross-entity-reference in R1707. Going forwards hopefully there will be other
515                  * ways to resolve a child self-link using parental embedded meta data that we don't
516                  * currently have.
517                  * 
518                  * The work-around with the service-instance entity-type is that it's possible to
519                  * request the self-link using only the service-instance-id because of a historical
520                  * AAI functional query requirement that it be possible to query a service-instance
521                  * only by it's service-instance-id. This entity type is the only one in the system
522                  * that can be queried this way which makes it a very limited workaround, but good
523                  * enough for the current release.
524                  */
525
526                 if (SERVICE_INSTANCE.equals(childEntityType)) {
527                   orderedQueryKeyParams.clear();
528                   orderedQueryKeyParams.add(childEntityQueryKeyString);
529                 } else {
530                   orderedQueryKeyParams.add(parentEntityQueryString);
531                   orderedQueryKeyParams.add(childEntityQueryKeyString);
532                 }
533
534                 String genericQueryStr = null;
535                 try {
536                   genericQueryStr =
537                       aaiAdapter.getGenericQueryForSelfLink(childEntityType, orderedQueryKeyParams);
538
539                   if (genericQueryStr != null) {
540                     aaiWorkOnHand.incrementAndGet();
541
542                     OperationResult aaiQueryResult = aaiAdapter.queryActiveInventoryWithRetries(
543                         genericQueryStr, "application/json", aaiAdapter.getNumRequestRetries());
544
545                     aaiWorkOnHand.decrementAndGet();
546
547                     if (aaiQueryResult != null && aaiQueryResult.wasSuccessful()) {
548
549                       Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
550                       JsonNode genericQueryResult = null;
551                       try {
552                         genericQueryResult =
553                             NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
554
555                         if (genericQueryResult != null) {
556
557                           NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link",
558                               entityLinks);
559
560                           String selfLink = null;
561
562                           if (entityLinks.size() != 1) {
563                             /**
564                              * an ambiguity exists where we can't reliably determine the self link,
565                              * this should be a permanent error
566                              */
567                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY,
568                                 String.valueOf(entityLinks.size()));
569                           } else {
570                             selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
571
572                             SearchableEntityLookup searchableEntityLookup =
573                                 SearchableEntityLookup.getInstance();
574
575                             SearchableOxmEntityDescriptor searchableDescriptor =
576                                 searchableEntityLookup.getSearchableEntityDescriptors()
577                                     .get(txn.getEntityType());
578
579                             if (searchableDescriptor != null
580                                 && searchableDescriptor.getSearchableAttributes().size() > 0) {
581
582                               IndexableCrossEntityReference icer =
583                                   getPopulatedDocument(targetEntityInstance, cerDescriptor);
584
585                               for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
586                                 icer.addCrossEntityReferenceValue(
587                                     parentCrossEntityReferenceAttributeValue);
588                               }
589
590                               icer.setLink(ActiveInventoryConfig.extractResourcePath(selfLink));
591
592                               icer.deriveFields();
593
594                               String link = null;
595                               try {
596                                 link = getElasticFullUrl("/" + icer.getId(), getIndexName());
597                               } catch (Exception exc) {
598                                 LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
599                                     exc.getLocalizedMessage());
600                               }
601
602                               if (link != null) {
603                                 NetworkTransaction n2 = new NetworkTransaction();
604                                 n2.setLink(link);
605                                 n2.setEntityType(txn.getEntityType());
606                                 n2.setDescriptor(txn.getDescriptor());
607                                 n2.setOperationType(HttpMethod.GET);
608
609                                 esWorkOnHand.incrementAndGet();
610
611                                 supplyAsync(
612                                     new PerformElasticSearchRetrieval(n2, elasticSearchAdapter),
613                                     esExecutor).whenComplete((result, error) -> {
614
615                                       esWorkOnHand.decrementAndGet();
616
617                                       if (error != null) {
618                                         LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED,
619                                             error.getLocalizedMessage());
620                                       } else {
621                                         updateElasticSearchCounters(result);
622                                         performDocumentUpsert(result, icer);
623                                       }
624                                     });
625                               }
626                             }
627                           }
628                         } else {
629                           LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
630                         }
631
632                       } catch (Exception exc) {
633                         LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(),
634                             exc.getLocalizedMessage());
635                       }
636
637                     } else {
638                       String message = "Entity sync failed because AAI query failed with error "
639                           + aaiQueryResult.getResult();
640                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
641                     }
642
643                   } else {
644                     String message =
645                         "Entity Sync failed because generic query str could not be determined.";
646                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
647                   }
648                 } catch (Exception exc) {
649                   String message =
650                       "Failed to sync entity because generation of generic query failed with error = "
651                           + exc.getMessage();
652                   LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
653                 }
654
655               }
656             }
657
658           } catch (IOException ioe) {
659             LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
660           }
661         }
662
663       }
664
665     } else {
666       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
667     }
668   }
669
670   /**
671    * Perform document upsert.
672    *
673    * @param esGetResult the es get result
674    * @param icer the icer
675    */
676   protected void performDocumentUpsert(NetworkTransaction esGetResult,
677       IndexableCrossEntityReference icer) {
678     /**
679      * <p>
680      * <ul>
681      * As part of the response processing we need to do the following:
682      * <li>1. Extract the version (if present), it will be the ETAG when we use the
683      * Search-Abstraction-Service
684      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
685      * tag
686      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
687      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
688      * </ul>
689      * </p>
690      */
691     String link = null;
692     try {
693       link = getElasticFullUrl("/" + icer.getId(), getIndexName());
694     } catch (Exception exc) {
695       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
696       return;
697     }
698
699     boolean wasEntryDiscovered = false;
700     String versionNumber = null;
701     if (esGetResult.getOperationResult().getResultCode() == 404) {
702       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
703     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
704       wasEntryDiscovered = true;
705       try {
706         versionNumber = NodeUtils.extractFieldValueFromObject(
707             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
708             "_version");
709       } catch (IOException exc) {
710         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
711             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
712         return;
713       }
714     } else {
715       /*
716        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
717        * return.
718        */
719       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
720           String.valueOf(esGetResult.getOperationResult().getResultCode()));
721       return;
722     }
723
724     try {
725       String jsonPayload = null;
726       if (wasEntryDiscovered) {
727         try {
728           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
729           NodeUtils.extractObjectsByKey(
730               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
731               "_source", sourceObject);
732
733           if (!sourceObject.isEmpty()) {
734             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
735             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
736             ObjectReader updater = mapper.readerForUpdating(me);
737             MergableEntity merged = updater.readValue(icer.getAsJson());
738             jsonPayload = mapper.writeValueAsString(merged);
739           }
740         } catch (IOException exc) {
741           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
742               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
743           return;
744         }
745       } else {
746         jsonPayload = icer.getAsJson();
747       }
748
749       if (wasEntryDiscovered) {
750         if (versionNumber != null && jsonPayload != null) {
751
752           String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(
753               getIndexName(), ElasticSearchConfig.getConfig().getType(), icer.getId(),
754               versionNumber, jsonPayload);
755
756           NetworkTransaction transactionTracker = new NetworkTransaction();
757           transactionTracker.setEntityType(esGetResult.getEntityType());
758           transactionTracker.setDescriptor(esGetResult.getDescriptor());
759           transactionTracker.setOperationType(HttpMethod.PUT);
760
761           esWorkOnHand.incrementAndGet();
762           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
763               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
764                   .whenComplete((result, error) -> {
765
766                     esWorkOnHand.decrementAndGet();
767
768                     if (error != null) {
769                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
770                     } else {
771                       updateElasticSearchCounters(result);
772                       processStoreDocumentResult(result, esGetResult, icer);
773                     }
774                   });
775         }
776
777       } else {
778         if (link != null && jsonPayload != null) {
779
780           NetworkTransaction updateElasticTxn = new NetworkTransaction();
781           updateElasticTxn.setLink(link);
782           updateElasticTxn.setEntityType(esGetResult.getEntityType());
783           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
784           updateElasticTxn.setOperationType(HttpMethod.PUT);
785
786           esWorkOnHand.incrementAndGet();
787           supplyAsync(
788               new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
789               esPutExecutor).whenComplete((result, error) -> {
790
791                 esWorkOnHand.decrementAndGet();
792
793                 if (error != null) {
794                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
795                 } else {
796                   updateElasticSearchCounters(result);
797                   processStoreDocumentResult(result, esGetResult, icer);
798                 }
799               });
800         }
801       }
802     } catch (Exception exc) {
803       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
804     }
805   }
806
807   /**
808    * Process store document result.
809    *
810    * @param esPutResult the es put result
811    * @param esGetResult the es get result
812    * @param icer the icer
813    */
814   private void processStoreDocumentResult(NetworkTransaction esPutResult,
815       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
816
817     OperationResult or = esPutResult.getOperationResult();
818
819     if (!or.wasSuccessful()) {
820       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
821
822         if (shouldAllowRetry(icer.getId())) {
823
824           esWorkOnHand.incrementAndGet();
825
826           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
827           retryQueue.push(rsc);
828
829           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
830         }
831       } else {
832         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
833             or.getResult());
834       }
835     }
836   }
837
838   /**
839    * Perform retry sync.
840    */
841   private void performRetrySync() {
842     while (retryQueue.peek() != null) {
843
844       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
845       if (rsc != null) {
846
847         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
848         NetworkTransaction txn = rsc.getNetworkTransaction();
849
850         String link = null;
851         try {
852           // In this retry flow the icer object has already
853           // derived its fields
854           link = getElasticFullUrl("/" + icer.getId(), getIndexName());
855         } catch (Exception exc) {
856           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
857         }
858
859         if (link != null) {
860           NetworkTransaction retryTransaction = new NetworkTransaction();
861           retryTransaction.setLink(link);
862           retryTransaction.setEntityType(txn.getEntityType());
863           retryTransaction.setDescriptor(txn.getDescriptor());
864           retryTransaction.setOperationType(HttpMethod.GET);
865
866           /*
867            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
868            * that for this request already when queuing the failed PUT!
869            */
870
871           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
872               esExecutor).whenComplete((result, error) -> {
873
874                 esWorkOnHand.decrementAndGet();
875
876                 if (error != null) {
877                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
878                 } else {
879                   updateElasticSearchCounters(result);
880                   performDocumentUpsert(result, icer);
881                 }
882               });
883         }
884
885       }
886     }
887   }
888
889   /**
890    * Should allow retry.
891    *
892    * @param id the id
893    * @return true, if successful
894    */
895   private boolean shouldAllowRetry(String id) {
896     boolean isRetryAllowed = true;
897     if (retryLimitTracker.get(id) != null) {
898       Integer currentCount = retryLimitTracker.get(id);
899       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
900         isRetryAllowed = false;
901         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
902       } else {
903         Integer newCount = new Integer(currentCount.intValue() + 1);
904         retryLimitTracker.put(id, newCount);
905       }
906
907     } else {
908       Integer firstRetryCount = new Integer(1);
909       retryLimitTracker.put(id, firstRetryCount);
910     }
911
912     return isRetryAllowed;
913   }
914
915   /**
916    * Gets the populated document.
917    *
918    * @param entityNode the entity node
919    * @param resultDescriptor the result descriptor
920    * @return the populated document
921    * @throws JsonProcessingException the json processing exception
922    * @throws IOException Signals that an I/O exception has occurred.
923    */
924   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
925       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
926
927     IndexableCrossEntityReference icer = new IndexableCrossEntityReference();
928
929     icer.setEntityType(resultDescriptor.getEntityName());
930
931     List<String> primaryKeyValues = new ArrayList<String>();
932     String pkeyValue = null;
933
934     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
935       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
936       if (pkeyValue != null) {
937         primaryKeyValues.add(pkeyValue);
938       } else {
939         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
940       }
941     }
942
943     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
944     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
945
946     return icer;
947
948   }
949 }