Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / crossentityreference / sync / CrossEntityReferenceSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.crossentityreference.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Deque;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentLinkedDeque;
38 import java.util.concurrent.ExecutorService;
39 import java.util.function.Supplier;
40
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.restclient.client.OperationResult;
45 import org.onap.aai.sparky.config.oxm.CrossEntityReference;
46 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceDescriptor;
47 import org.onap.aai.sparky.config.oxm.CrossEntityReferenceLookup;
48 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
49 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
50 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
51 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
52 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
53 import org.onap.aai.sparky.dal.NetworkTransaction;
54 import org.onap.aai.sparky.dal.rest.HttpMethod;
55 import org.onap.aai.sparky.logging.AaiUiMsgs;
56 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
57 import org.onap.aai.sparky.sync.IndexSynchronizer;
58 import org.onap.aai.sparky.sync.SynchronizerConstants;
59 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
60 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
61 import org.onap.aai.sparky.sync.entity.IndexableCrossEntityReference;
62 import org.onap.aai.sparky.sync.entity.MergableEntity;
63 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
64 import org.onap.aai.sparky.sync.enumeration.OperationState;
65 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
66 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
67 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
68 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
69 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
70 import org.onap.aai.sparky.util.NodeUtils;
71 import org.slf4j.MDC;
72
73 import com.fasterxml.jackson.core.JsonProcessingException;
74 import com.fasterxml.jackson.databind.JsonNode;
75 import com.fasterxml.jackson.databind.ObjectReader;
76 import com.fasterxml.jackson.databind.node.ArrayNode;
77
78 /**
79  * The Class CrossEntityReferenceSynchronizer.
80  */
81 public class CrossEntityReferenceSynchronizer extends AbstractEntitySynchronizer
82     implements IndexSynchronizer {
83
84   /**
85    * The Class RetryCrossEntitySyncContainer.
86    */
87   private class RetryCrossEntitySyncContainer {
88     NetworkTransaction txn;
89     IndexableCrossEntityReference icer;
90
91     /**
92      * Instantiates a new retry cross entity sync container.
93      *
94      * @param txn the txn
95      * @param icer the icer
96      */
97     public RetryCrossEntitySyncContainer(NetworkTransaction txn,
98         IndexableCrossEntityReference icer) {
99       this.txn = txn;
100       this.icer = icer;
101     }
102
103     public NetworkTransaction getNetworkTransaction() {
104       return txn;
105     }
106
107     public IndexableCrossEntityReference getIndexableCrossEntityReference() {
108       return icer;
109     }
110   }
111
112   private static final Logger LOG =
113       LoggerFactory.getInstance().getLogger(CrossEntityReferenceSynchronizer.class);
114
115   private static final String SERVICE_INSTANCE = "service-instance";
116
117   private Deque<SelfLinkDescriptor> selflinks;
118   private Deque<RetryCrossEntitySyncContainer> retryQueue;
119   private Map<String, Integer> retryLimitTracker;
120   private boolean isAllWorkEnumerated;
121   protected ExecutorService esPutExecutor;
122   private CrossEntityReferenceLookup crossEntityReferenceLookup;
123   private OxmEntityLookup oxmEntityLookup;
124   private SearchableEntityLookup searchableEntityLookup;
125   
126
127   /**
128    * Instantiates a new cross entity reference synchronizer.
129    *
130    * @param indexName the index name
131    * @throws Exception the exception
132    */
133   public CrossEntityReferenceSynchronizer(ElasticSearchSchemaConfig schemaConfig,
134       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
135       NetworkStatisticsConfig esStatConfig, CrossEntityReferenceLookup crossEntityReferenceLookup,
136       OxmEntityLookup oxmEntityLookup, SearchableEntityLookup searchableEntityLookup) throws Exception {
137     super(LOG, "CERS", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
138         aaiStatConfig, esStatConfig);
139     this.crossEntityReferenceLookup = crossEntityReferenceLookup;
140     this.oxmEntityLookup = oxmEntityLookup;
141     this.searchableEntityLookup = searchableEntityLookup;
142     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
143     this.retryQueue = new ConcurrentLinkedDeque<RetryCrossEntitySyncContainer>();
144     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
145     this.synchronizerName = "Cross Reference Entity Synchronizer";
146     this.isAllWorkEnumerated = false;
147     this.esPutExecutor = NodeUtils.createNamedExecutor("CERS-ES-PUT", 5, LOG);
148     this.aaiEntityStats.intializeEntityCounters(
149         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().keySet());
150
151     this.esEntityStats.intializeEntityCounters(
152         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().keySet());
153     this.syncDurationInMs = -1;
154   }
155
156   /* (non-Javadoc)
157    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
158    */
159   @Override
160   public OperationState doSync() {
161     this.syncDurationInMs = -1;
162         String txnID = NodeUtils.getRandomTxnId();
163     MdcContext.initialize(txnID, "CrossEntitySynchronizer", "", "Sync", "");
164         
165     resetCounters();
166     syncStartedTimeStampInMs = System.currentTimeMillis();
167     launchSyncFlow();
168     return OperationState.OK;
169   }
170
171   @Override
172   public SynchronizerState getState() {
173     if (!isSyncDone()) {
174       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
175     }
176
177     return SynchronizerState.IDLE;
178   }
179
180   /* (non-Javadoc)
181    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
182    */
183   @Override
184   public String getStatReport(boolean showFinalReport) {
185     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
186     return getStatReport(syncDurationInMs, showFinalReport);
187   }
188
189   /* (non-Javadoc)
190    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
191    */
192   @Override
193   public void shutdown() {
194     this.shutdownExecutors();
195   }
196
197   @Override
198   protected boolean isSyncDone() {
199     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
200
201     if (totalWorkOnHand > 0 || !isAllWorkEnumerated) {
202       return false;
203     }
204
205     return true;
206   }
207
208   /**
209    * Launch sync flow.
210    *
211    * @return the operation state
212    */
213   private OperationState launchSyncFlow() {
214         final Map<String,String> contextMap = MDC.getCopyOfContextMap();
215     Map<String, CrossEntityReferenceDescriptor> descriptorMap =
216         crossEntityReferenceLookup.getCrossReferenceEntityDescriptors();
217
218     if (descriptorMap.isEmpty()) {
219       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM);
220
221       return OperationState.ERROR;
222     }
223
224     Collection<String> syncTypes = descriptorMap.keySet();
225
226     try {
227
228       /*
229        * launch a parallel async thread to process the documents for each entity-type (to max the of
230        * the configured executor anyway)
231        */
232
233       aaiWorkOnHand.set(syncTypes.size());
234
235       for (String key : syncTypes) {
236
237         supplyAsync(new Supplier<Void>() {
238
239           @Override
240           public Void get() {
241                 MDC.setContextMap(contextMap);
242             OperationResult typeLinksResult = null;
243             try {
244               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
245               aaiWorkOnHand.decrementAndGet();
246               processEntityTypeSelfLinks(typeLinksResult);
247             } catch (Exception exc) {
248               LOG.error(AaiUiMsgs.ERROR_GENERIC,
249                   "An error occurred processing entity selflinks. Error: " + exc.getMessage());
250             }
251
252             return null;
253           }
254
255         }, aaiExecutor).whenComplete((result, error) -> {
256           if (error != null) {
257             LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, error.getMessage());
258           }
259         });
260       }
261
262       while (aaiWorkOnHand.get() != 0) {
263
264         if (LOG.isDebugEnabled()) {
265           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
266         }
267
268         Thread.sleep(1000);
269       }
270
271       aaiWorkOnHand.set(selflinks.size());
272       isAllWorkEnumerated = true;
273       performSync();
274
275       while (!isSyncDone()) {
276         performRetrySync();
277         Thread.sleep(1000);
278       }
279
280       /*
281        * Make sure we don't hang on to retries that failed which could cause issues during future
282        * syncs
283        */
284       retryLimitTracker.clear();
285
286     } catch (Exception exc) {
287       LOG.error(AaiUiMsgs.ERROR_GENERIC,
288           "An error occurred during entity synchronization. Error: " + exc.getMessage());
289
290     }
291
292     return OperationState.OK;
293   }
294
295   /**
296    * Perform sync.
297    */
298   private void performSync() {
299     while (selflinks.peek() != null) {
300
301       SelfLinkDescriptor linkDescriptor = selflinks.poll();
302       aaiWorkOnHand.decrementAndGet();
303
304       CrossEntityReferenceDescriptor descriptor = null;
305
306       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
307
308         descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors()
309             .get(linkDescriptor.getEntityType());
310
311         if (descriptor == null) {
312           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
313           // go to next element in iterator
314           continue;
315         }
316
317         if (descriptor.hasCrossEntityReferences()) {
318
319           NetworkTransaction txn = new NetworkTransaction();
320           txn.setDescriptor(descriptor);
321           txn.setLink(linkDescriptor.getSelfLink());
322           txn.setQueryParameters(linkDescriptor.getDepthModifier());
323           txn.setOperationType(HttpMethod.GET);
324           txn.setEntityType(linkDescriptor.getEntityType());
325
326           aaiWorkOnHand.incrementAndGet();
327
328           supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
329               .whenComplete((result, error) -> {
330
331                 aaiWorkOnHand.decrementAndGet();
332
333                 if (error != null) {
334                   LOG.error(AaiUiMsgs.SELF_LINK_GET, error.getLocalizedMessage());
335                 } else {
336                   if (result == null) {
337                     LOG.error(AaiUiMsgs.SELF_LINK_CROSS_REF_SYNC);
338                   } else {
339                     updateActiveInventoryCounters(result);
340                     fetchDocumentForUpsert(result);
341                   }
342                 }
343               });
344         }
345       }
346     }
347   }
348
349   /**
350    * Process entity type self links.
351    *
352    * @param operationResult the operation result
353    */
354   private void processEntityTypeSelfLinks(OperationResult operationResult) {
355
356     JsonNode rootNode = null;
357
358     final String jsonResult = operationResult.getResult();
359
360     if (jsonResult != null && jsonResult.length() > 0) {
361
362       try {
363         rootNode = mapper.readTree(jsonResult);
364       } catch (IOException exc) {
365         // TODO // TODO -> LOG, waht should be logged here?
366       }
367
368       JsonNode resultData = rootNode.get("result-data");
369       ArrayNode resultDataArrayNode = null;
370
371       if (resultData.isArray()) {
372         resultDataArrayNode = (ArrayNode) resultData;
373
374         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
375         JsonNode element = null;
376
377         while (elementIterator.hasNext()) {
378           element = elementIterator.next();
379
380           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
381           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
382
383           CrossEntityReferenceDescriptor descriptor = null;
384
385           if (resourceType != null && resourceLink != null) {
386             descriptor = crossEntityReferenceLookup.getCrossReferenceEntityDescriptors().get(resourceType);
387
388             if (descriptor == null) {
389               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
390               // go to next element in iterator
391               continue;
392             }
393             if (descriptor.hasCrossEntityReferences()) {
394               selflinks.add(new SelfLinkDescriptor(
395                   resourceLink,SynchronizerConstants.DEPTH_ALL_MODIFIER, resourceType));
396             }
397           }
398         }
399       }
400     }
401   }
402
403   
404   
405   /**
406    * By providing the entity type and a json node for the entity, determine the
407    * primary key name(s) + primary key value(s) sufficient to build an entity query string
408    * of the following format:
409    * 
410    *      <entityType>.<primaryKeyNames>:<primaryKeyValues>
411    * 
412    * @return - a composite string in the above format or null
413    */
414   private String determineEntityQueryString(String entityType, JsonNode entityJsonNode) {
415     
416     OxmEntityDescriptor entityDescriptor =
417         oxmEntityLookup.getEntityDescriptors().get(entityType);
418     
419     String queryString = null;
420     
421     if ( entityDescriptor != null ) {
422
423       final List<String> primaryKeyNames = entityDescriptor.getPrimaryKeyAttributeNames();
424       final List<String> keyValues = new ArrayList<String>();
425       NodeUtils.extractFieldValuesFromObject(entityJsonNode, primaryKeyNames, keyValues);
426
427       queryString = entityType + "." + NodeUtils.concatArray(primaryKeyNames,"/") + ":" + NodeUtils.concatArray(keyValues);
428
429     } 
430     
431     return queryString;
432
433     
434   }
435   
436   /**
437    * Fetch document for upsert.
438    *
439    * @param txn the txn
440    */
441   private void fetchDocumentForUpsert(NetworkTransaction txn) {
442     
443     if (!txn.getOperationResult().wasSuccessful()) {
444       LOG.error(AaiUiMsgs.SELF_LINK_GET, txn.getOperationResult().getResult());
445       return;
446     }
447
448     CrossEntityReferenceDescriptor cerDescriptor = crossEntityReferenceLookup
449         .getCrossReferenceEntityDescriptors().get(txn.getDescriptor().getEntityName());
450     
451     if (cerDescriptor != null && cerDescriptor.hasCrossEntityReferences()) {
452
453       final String jsonResult = txn.getOperationResult().getResult();
454       
455       if (jsonResult != null && jsonResult.length() > 0) {
456         
457         /**
458          * Here's what we are going to do:
459          * 
460          * <li>Extract primary key name and value from the parent type.
461          * <li>Extract the primary key and value from the nested child instance.
462          * <li>Build a generic query to discover the self-link for the nested-child-instance using
463          * parent and child.
464          * <li>Set the self-link on the child.
465          * <li>Generate the id that will allow the elastic-search upsert to work.
466          * <li>Rinse and repeat.
467          */
468           
469           CrossEntityReference cerDefinition = cerDescriptor.getCrossEntityReference();
470
471           if (cerDefinition != null) {
472             JsonNode convertedNode = null;
473             try {
474               convertedNode = NodeUtils.convertJsonStrToJsonNode(txn.getOperationResult().getResult());
475               
476               final String parentEntityQueryString = determineEntityQueryString(txn.getEntityType(), convertedNode);
477               
478               List<String> extractedParentEntityAttributeValues = new ArrayList<String>();
479
480               NodeUtils.extractFieldValuesFromObject(convertedNode,
481                   cerDefinition.getReferenceAttributes(),
482                   extractedParentEntityAttributeValues);
483
484               List<JsonNode> nestedTargetEntityInstances = new ArrayList<JsonNode>();
485               NodeUtils.extractObjectsByKey(convertedNode, cerDefinition.getTargetEntityType(),
486                   nestedTargetEntityInstances);
487
488               for (JsonNode targetEntityInstance : nestedTargetEntityInstances) {
489
490                 if (cerDescriptor != null) {
491                   
492                   String childEntityType = cerDefinition.getTargetEntityType();
493                   OxmEntityDescriptor childDesciptor = oxmEntityLookup.getEntityDescriptors().get(childEntityType);
494                   
495                   List<String> childPrimaryKeyNames = null;
496                   
497                   if (childDesciptor != null) {
498                     childPrimaryKeyNames = childDesciptor.getPrimaryKeyAttributeNames();
499                   } else {
500                     childPrimaryKeyNames = new ArrayList<String>();
501                   }
502                                 
503                   List<String> childKeyValues = new ArrayList<String>();
504                   NodeUtils.extractFieldValuesFromObject(targetEntityInstance, childPrimaryKeyNames, childKeyValues);
505                   
506                   String childEntityQueryKeyString = childEntityType + "." + NodeUtils.concatArray(childPrimaryKeyNames,"/") + ":" + NodeUtils.concatArray(childKeyValues);
507                   
508                   /**
509                    * Build generic-query to query child instance self-link from AAI
510                    */
511                   List<String> orderedQueryKeyParams = new ArrayList<String>();
512
513                   /**
514                    * At present, there is an issue with resolving the self-link using the
515                    * generic-query with nothing more than the service-instance identifier and the
516                    * service-subscription. There is another level of detail we don't have access to
517                    * unless we parse it out of the service-subscription self-link, which is a
518                    * coupling I would like to avoid. Fortunately, there is a workaround, but only
519                    * for service-instances, which is presently our only use-case for the
520                    * cross-entity-reference in R1707. Going forwards hopefully there will be other
521                    * ways to resolve a child self-link using parental embedded meta data that we
522                    * don't currently have.
523                    * 
524                    * The work-around with the service-instance entity-type is that it's possible to
525                    * request the self-link using only the service-instance-id because of a
526                    * historical AAI functional query requirement that it be possible to query a
527                    * service-instance only by it's service-instance-id. This entity type is the only
528                    * one in the system that can be queried this way which makes it a very limited
529                    * workaround, but good enough for the current release.
530                    */
531
532                   if (SERVICE_INSTANCE.equals(childEntityType)) {
533                     orderedQueryKeyParams.clear();
534                     orderedQueryKeyParams.add(childEntityQueryKeyString);
535                   } else {
536                     orderedQueryKeyParams.add(parentEntityQueryString);
537                     orderedQueryKeyParams.add(childEntityQueryKeyString);
538                   }
539
540                   String genericQueryStr = null;
541                   try {
542                     genericQueryStr = aaiAdapter.getGenericQueryForSelfLink(childEntityType, orderedQueryKeyParams);
543                     
544                     if (genericQueryStr != null) {
545                       aaiWorkOnHand.incrementAndGet();
546
547                       OperationResult aaiQueryResult = aaiAdapter.queryActiveInventoryWithRetries(
548                           genericQueryStr, "application/json",
549                           aaiAdapter.getEndpointConfig().getNumRequestRetries());
550
551                       aaiWorkOnHand.decrementAndGet();
552
553                       if (aaiQueryResult!= null && aaiQueryResult.wasSuccessful()) {
554                         
555                         Collection<JsonNode> entityLinks = new ArrayList<JsonNode>();
556                         JsonNode genericQueryResult = null;
557                         try {
558                           genericQueryResult = NodeUtils.convertJsonStrToJsonNode(aaiQueryResult.getResult());
559                           
560                           if ( genericQueryResult != null ) {
561                             
562                             NodeUtils.extractObjectsByKey(genericQueryResult, "resource-link", entityLinks);
563
564                             String selfLink = null;
565
566                             if (entityLinks.size() != 1) {
567                               /**
568                                * an ambiguity exists where we can't reliably determine the self
569                                * link, this should be a permanent error
570                                */
571                               LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_SELFLINK_AMBIGUITY, String.valueOf(entityLinks.size()));
572                           } else {
573                             selfLink = ((JsonNode) entityLinks.toArray()[0]).asText();
574
575
576                             IndexableCrossEntityReference icer =
577                                 getPopulatedDocument(targetEntityInstance, cerDescriptor);
578
579                             for (String parentCrossEntityReferenceAttributeValue : extractedParentEntityAttributeValues) {
580                               icer.addCrossEntityReferenceValue(
581                                   parentCrossEntityReferenceAttributeValue);
582                             }
583
584                             icer.setLink(ActiveInventoryAdapter.extractResourcePath(selfLink));
585
586                             icer.deriveFields();
587
588                             String link = null;
589                             try {
590                               link = elasticSearchAdapter
591                                   .buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
592                             } catch (Exception exc) {
593                               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY,
594                                   exc.getLocalizedMessage());
595                             }
596
597                             if (link != null) {
598                               NetworkTransaction n2 = new NetworkTransaction();
599                               n2.setLink(link);
600                               n2.setEntityType(txn.getEntityType());
601                               n2.setDescriptor(txn.getDescriptor());
602                               n2.setOperationType(HttpMethod.GET);
603
604                               esWorkOnHand.incrementAndGet();
605
606                               supplyAsync(
607                                   new PerformElasticSearchRetrieval(n2, elasticSearchAdapter),
608                                   esExecutor).whenComplete((result, error) -> {
609
610                                     esWorkOnHand.decrementAndGet();
611
612                                     if (error != null) {
613                                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED,
614                                           error.getLocalizedMessage());
615                                     } else {
616                                       updateElasticSearchCounters(result);
617                                       performDocumentUpsert(result, icer);
618                                     }
619                                   });
620                             }
621
622                           }
623                           } else {
624                             LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DURING_AAI_RESPONSE_CONVERSION);
625                           }
626
627                         } catch (Exception exc) {
628                           LOG.error(AaiUiMsgs.JSON_CONVERSION_ERROR, JsonNode.class.toString(), exc.getLocalizedMessage());
629                         }
630                         
631                       } else {
632                         String message = "Entity sync failed because AAI query failed with error " + aaiQueryResult.getResult(); 
633                         LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
634                       }
635                       
636                     } else {
637                       String message = "Entity Sync failed because generic query str could not be determined.";
638                       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
639                     }
640                   } catch (Exception exc) {
641                     String message = "Failed to sync entity because generation of generic query failed with error = " + exc.getMessage();
642                     LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_QUERY_ERROR, message);
643                   }
644                   
645                 }
646               }
647               
648             } catch (IOException ioe) {
649               LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, ioe.getMessage());
650             }
651           }
652           
653         } 
654       
655     } else {
656       LOG.error(AaiUiMsgs.ENTITY_SYNC_FAILED_DESCRIPTOR_NOT_FOUND, txn.getEntityType());
657     }
658   }
659
660   /**
661    * Perform document upsert.
662    *
663    * @param esGetResult the es get result
664    * @param icer the icer
665    */
666   protected void performDocumentUpsert(NetworkTransaction esGetResult,
667       IndexableCrossEntityReference icer) {
668     /**
669      * <p>
670      * <ul>
671      * As part of the response processing we need to do the following:
672      * <li>1. Extract the version (if present), it will be the ETAG when we use the
673      * Search-Abstraction-Service
674      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
675      * tag
676      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
677      * <li>b) if version != null, do PUT with _update?version= (versionNumber) in the URI to elastic
678      * </ul>
679      * </p>
680      */
681     String link = null;
682     try {
683       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
684     } catch (Exception exc) {
685       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
686       return;
687     }
688
689     boolean wasEntryDiscovered = false;
690     String versionNumber = null;
691     if (esGetResult.getOperationResult().getResultCode() == 404) {
692       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, icer.getEntityPrimaryKeyValue());
693     } else if (esGetResult.getOperationResult().getResultCode() == 200) {
694       wasEntryDiscovered = true;
695       try {
696         versionNumber = NodeUtils.extractFieldValueFromObject(
697             NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
698             "_version");
699       } catch (IOException exc) {
700         LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "version Number",
701             icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
702         return;
703       }
704     } else {
705       /*
706        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
707        * return.
708        */
709       LOG.info(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
710           String.valueOf(esGetResult.getOperationResult().getResultCode()));
711       return;
712     }
713
714     try {
715       String jsonPayload = null;
716       if (wasEntryDiscovered) {
717         try {
718           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
719           NodeUtils.extractObjectsByKey(
720               NodeUtils.convertJsonStrToJsonNode(esGetResult.getOperationResult().getResult()),
721               "_source", sourceObject);
722
723           if (!sourceObject.isEmpty()) {
724             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
725             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
726             ObjectReader updater = mapper.readerForUpdating(me);
727             MergableEntity merged = updater.readValue(icer.getAsJson());
728             jsonPayload = mapper.writeValueAsString(merged);
729           }
730         } catch (IOException exc) {
731           LOG.error(AaiUiMsgs.ES_ABORT_CROSS_ENTITY_REF_SYNC, "source value",
732               icer.getEntityPrimaryKeyValue(), exc.getLocalizedMessage());
733           return;
734         }
735       } else {
736         jsonPayload = icer.getAsJson();
737       }
738
739       if (wasEntryDiscovered) {
740         if (versionNumber != null && jsonPayload != null) {
741
742           String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
743               "default", icer.getId(), versionNumber, jsonPayload);
744
745           NetworkTransaction transactionTracker = new NetworkTransaction();
746           transactionTracker.setEntityType(esGetResult.getEntityType());
747           transactionTracker.setDescriptor(esGetResult.getDescriptor());
748           transactionTracker.setOperationType(HttpMethod.PUT);
749
750           esWorkOnHand.incrementAndGet();
751           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
752               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
753                   .whenComplete((result, error) -> {
754
755                     esWorkOnHand.decrementAndGet();
756
757                     if (error != null) {
758                       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
759                     } else {
760                       updateElasticSearchCounters(result);
761                       processStoreDocumentResult(result, esGetResult, icer);
762                     }
763                   });
764         }
765
766       } else {
767         if (link != null && jsonPayload != null) {
768
769           NetworkTransaction updateElasticTxn = new NetworkTransaction();
770           updateElasticTxn.setLink(link);
771           updateElasticTxn.setEntityType(esGetResult.getEntityType());
772           updateElasticTxn.setDescriptor(esGetResult.getDescriptor());
773           updateElasticTxn.setOperationType(HttpMethod.PUT);
774
775           esWorkOnHand.incrementAndGet();
776           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
777               esPutExecutor).whenComplete((result, error) -> {
778
779                 esWorkOnHand.decrementAndGet();
780
781                 if (error != null) {
782                   LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, error.getLocalizedMessage());
783                 } else {
784                   updateElasticSearchCounters(result);
785                   processStoreDocumentResult(result, esGetResult, icer);
786                 }
787               });
788         }
789       }
790     } catch (Exception exc) {
791       LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_REF_PUT, exc.getLocalizedMessage());
792     }
793   }
794
795   /**
796    * Process store document result.
797    *
798    * @param esPutResult the es put result
799    * @param esGetResult the es get result
800    * @param icer the icer
801    */
802   private void processStoreDocumentResult(NetworkTransaction esPutResult,
803       NetworkTransaction esGetResult, IndexableCrossEntityReference icer) {
804
805     OperationResult or = esPutResult.getOperationResult();
806
807     if (!or.wasSuccessful()) {
808       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
809
810         if (shouldAllowRetry(icer.getId())) {
811
812           esWorkOnHand.incrementAndGet();
813
814           RetryCrossEntitySyncContainer rsc = new RetryCrossEntitySyncContainer(esGetResult, icer);
815           retryQueue.push(rsc);
816
817           LOG.warn(AaiUiMsgs.ES_CROSS_REF_SYNC_VERSION_CONFLICT);
818         }
819       } else {
820         LOG.error(AaiUiMsgs.ES_CROSS_REF_SYNC_FAILURE, String.valueOf(or.getResultCode()),
821             or.getResult());
822       }
823     }
824   }
825
826   /**
827    * Perform retry sync.
828    */
829   private void performRetrySync() {
830     while (retryQueue.peek() != null) {
831
832       RetryCrossEntitySyncContainer rsc = retryQueue.poll();
833       if (rsc != null) {
834
835         IndexableCrossEntityReference icer = rsc.getIndexableCrossEntityReference();
836         NetworkTransaction txn = rsc.getNetworkTransaction();
837
838         String link = null;
839         try {
840           // In this retry flow the icer object has already
841           // derived its fields
842           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), icer.getId());
843         } catch (Exception exc) {
844           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
845         }
846
847         if (link != null) {
848           NetworkTransaction retryTransaction = new NetworkTransaction();
849           retryTransaction.setLink(link);
850           retryTransaction.setEntityType(txn.getEntityType());
851           retryTransaction.setDescriptor(txn.getDescriptor());
852           retryTransaction.setOperationType(HttpMethod.GET);
853
854           /*
855            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow and we did
856            * that for this request already when queuing the failed PUT!
857            */
858
859           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
860               esExecutor).whenComplete((result, error) -> {
861
862                 esWorkOnHand.decrementAndGet();
863
864                 if (error != null) {
865                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
866                 } else {
867                   updateElasticSearchCounters(result);
868                   performDocumentUpsert(result, icer);
869                 }
870               });
871         }
872
873       }
874     }
875   }
876
877   /**
878    * Should allow retry.
879    *
880    * @param id the id
881    * @return true, if successful
882    */
883   private boolean shouldAllowRetry(String id) {
884     boolean isRetryAllowed = true;
885     if (retryLimitTracker.get(id) != null) {
886       Integer currentCount = retryLimitTracker.get(id);
887       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
888         isRetryAllowed = false;
889         LOG.error(AaiUiMsgs.ES_CROSS_ENTITY_RESYNC_LIMIT, id);
890       } else {
891         Integer newCount = new Integer(currentCount.intValue() + 1);
892         retryLimitTracker.put(id, newCount);
893       }
894
895     } else {
896       Integer firstRetryCount = new Integer(1);
897       retryLimitTracker.put(id, firstRetryCount);
898     }
899
900     return isRetryAllowed;
901   }
902
903   /**
904    * Gets the populated document.
905    *
906    * @param entityNode the entity node
907    * @param resultDescriptor the result descriptor
908    * @return the populated document
909    * @throws JsonProcessingException the json processing exception
910    * @throws IOException Signals that an I/O exception has occurred.
911    */
912   protected IndexableCrossEntityReference getPopulatedDocument(JsonNode entityNode,
913       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
914
915     IndexableCrossEntityReference icer = new IndexableCrossEntityReference();
916
917     icer.setEntityType(resultDescriptor.getEntityName());
918
919     List<String> primaryKeyValues = new ArrayList<String>();
920     String pkeyValue = null;
921
922     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
923       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
924       if (pkeyValue != null) {
925         primaryKeyValues.add(pkeyValue);
926       } else {
927         LOG.warn(AaiUiMsgs.ES_PKEYVALUE_NULL, resultDescriptor.getEntityName());
928       }
929     }
930
931     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
932     icer.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
933
934     return icer;
935
936   }
937 }