6d63a8a56ce3760a861a4b15ff03a9bbdacb6b48
[aai/sparky-be.git] / src / test / java / org / onap / aai / sparky / viewandinspect / sync / ViewInspectGizmoEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.viewandinspect.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Deque;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.ExecutorService;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.OxmEntityContainerLookup;
44 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
45 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
46 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
48 import org.onap.aai.sparky.dal.GizmoAdapter;
49 import org.onap.aai.sparky.dal.NetworkTransaction;
50 import org.onap.aai.sparky.dal.rest.HttpMethod;
51 import org.onap.aai.sparky.logging.AaiUiMsgs;
52 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
53 import org.onap.aai.sparky.sync.IndexSynchronizer;
54 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
55 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
56 import org.onap.aai.sparky.sync.entity.MergableEntity;
57 import org.onap.aai.sparky.sync.entity.SearchableEntity;
58 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
59 import org.onap.aai.sparky.sync.enumeration.OperationState;
60 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
61 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
62 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
63 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
64 import org.onap.aai.sparky.sync.task.PerformGizmoRetrieval;
65 import org.onap.aai.sparky.util.NodeUtils;
66 import org.slf4j.MDC;
67
68 import com.fasterxml.jackson.core.JsonProcessingException;
69 import com.fasterxml.jackson.databind.JsonNode;
70 import com.fasterxml.jackson.databind.ObjectReader;
71 import com.fasterxml.jackson.databind.node.ArrayNode;
72
73 /**
74  * The Class SearchableEntitySynchronizer.
75  */
76 public class ViewInspectGizmoEntitySynchronizer extends AbstractEntitySynchronizer
77     implements IndexSynchronizer {
78
79   /**
80    * The Class RetrySearchableEntitySyncContainer.
81    */
82   private class RetrySearchableEntitySyncContainer {
83     NetworkTransaction txn;
84     SearchableEntity se;
85
86     /**
87      * Instantiates a new retry searchable entity sync container.
88      *
89      * @param txn the txn
90      * @param se the se
91      */
92     public RetrySearchableEntitySyncContainer(NetworkTransaction txn, SearchableEntity se) {
93       this.txn = txn;
94       this.se = se;
95     }
96
97     public NetworkTransaction getNetworkTransaction() {
98       return txn;
99     }
100
101     public SearchableEntity getSearchableEntity() {
102       return se;
103     }
104   }
105
106   private static final Logger LOG =
107       LoggerFactory.getInstance().getLogger(ViewInspectGizmoEntitySynchronizer.class);
108
109   private boolean allWorkEnumerated;
110   private Deque<SelfLinkDescriptor> selflinks;
111   private Deque<RetrySearchableEntitySyncContainer> retryQueue;
112   private Map<String, Integer> retryLimitTracker;
113   protected ExecutorService esPutExecutor;
114   private OxmEntityLookup oxmEntityLookup;
115   private SearchableEntityLookup searchableEntityLookup;
116   private GizmoAdapter gizmoAdapter;
117   private OxmEntityContainerLookup entityContainerLookup;
118
119   /**
120    * Instantiates a new searchable entity synchronizer.
121    *
122    * @param indexName the index name
123    * @throws Exception the exception
124    */
125   public ViewInspectGizmoEntitySynchronizer(ElasticSearchSchemaConfig schemaConfig,
126       int internalSyncWorkers, int gizmoWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
127       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
128       SearchableEntityLookup searchableEntityLookup, OxmEntityContainerLookup entityContainerLookup) throws Exception {
129     super(LOG, "SES", internalSyncWorkers, gizmoWorkers, esWorkers, schemaConfig.getIndexName(),
130         aaiStatConfig, esStatConfig);
131     
132     this.oxmEntityLookup = oxmEntityLookup;
133     this.searchableEntityLookup = searchableEntityLookup;
134     this.entityContainerLookup = entityContainerLookup;
135     this.allWorkEnumerated = false;
136     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
137     this.retryQueue = new ConcurrentLinkedDeque<RetrySearchableEntitySyncContainer>();
138     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
139     this.synchronizerName = "Searchable Entity Synchronizer";
140     this.esPutExecutor = NodeUtils.createNamedExecutor("SES-ES-PUT", 5, LOG);
141     this.aaiEntityStats.intializeEntityCounters(
142         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
143     this.esEntityStats.intializeEntityCounters(
144         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
145     this.syncDurationInMs = -1;
146   }
147   
148   
149
150   public GizmoAdapter getGizmoAdapter() {
151         return gizmoAdapter;
152 }
153
154
155
156 public void setGizmoAdapter(GizmoAdapter gizmoAdapter) {
157         this.gizmoAdapter = gizmoAdapter;
158 }
159
160
161
162 /**
163    * Collect all the work.
164    *
165    * @return the operation state
166    */
167   private OperationState collectAllTheWork() {
168     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
169     
170     Collection<String> searchableEntityGroups = entityContainerLookup.getSearchableEntityGroups();
171     
172     if (searchableEntityGroups.isEmpty()) {
173       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
174       return OperationState.ERROR;
175     }
176
177
178     try {
179
180       /*
181        * launch a parallel async thread to process the documents for each entity-type (to max the
182        * of the configured executor anyway)
183        */
184
185         /*searchableEntityGroups = new ArrayList<String>();
186         searchableEntityGroups.add("pservers");*/
187         
188       aaiWorkOnHand.set(searchableEntityGroups.size());
189
190       for (String searchableEntityGroup : searchableEntityGroups) {
191
192         supplyAsync(new Supplier<Void>() {
193
194           @Override
195           public Void get() {
196             MDC.setContextMap(contextMap);
197             OperationResult typeLinksResult = null;
198             try {
199               typeLinksResult = gizmoAdapter.getSelfLinksByEntityType(searchableEntityGroup);
200               aaiWorkOnHand.decrementAndGet();
201               processEntityTypeSelfLinks(typeLinksResult);
202             } catch (Exception exc) {
203
204                 exc.printStackTrace();
205             }
206
207             return null;
208           }
209
210         }, aaiExecutor).whenComplete((result, error) -> {
211
212           if (error != null) {
213             LOG.error(AaiUiMsgs.ERROR_GENERIC,
214                 "An error occurred getting data from AAI. Error = " + error.getMessage());
215           }
216         });
217
218       }
219
220       while (aaiWorkOnHand.get() != 0) {
221
222         if (LOG.isDebugEnabled()) {
223           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
224         }
225
226         Thread.sleep(1000);
227       }
228
229       aaiWorkOnHand.set(selflinks.size());
230       allWorkEnumerated = true;
231       syncEntityTypes();
232
233       while (!isSyncDone()) {
234         performRetrySync();
235         Thread.sleep(1000);
236       }
237
238       /*
239        * Make sure we don't hang on to retries that failed which could cause issues during future
240        * syncs
241        */
242       retryLimitTracker.clear();
243
244     } catch (Exception exc) {
245       // TODO -> LOG, waht should be logged here?
246     }
247
248     return OperationState.OK;
249   }
250
251   /* (non-Javadoc)
252    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
253    */
254   @Override
255   public OperationState doSync() {
256     this.syncDurationInMs = -1;
257     String txnID = NodeUtils.getRandomTxnId();
258     MdcContext.initialize(txnID, "SearchableEntitySynchronizer", "", "Sync", "");
259     
260     resetCounters();
261     this.allWorkEnumerated = false;
262     syncStartedTimeStampInMs = System.currentTimeMillis();
263     collectAllTheWork();
264
265     return OperationState.OK;
266   }
267
268   /**
269    * Process entity type self links.
270    *
271    * @param operationResult the operation result
272    */
273         private void processEntityTypeSelfLinks(OperationResult operationResult) {
274
275                 JsonNode rootNode = null;
276
277                 final String jsonResult = operationResult.getResult();
278
279                 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
280                         
281                         try {
282                                 rootNode = mapper.readTree(jsonResult);
283                         } catch (IOException exc) {
284                                 String message = "Could not deserialize JSON (representing operation result) as node tree. "
285                                                 + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
286                                 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
287                         }
288
289                         ArrayNode resultDataArrayNode = null;
290
291                         if (rootNode.isArray()) {
292                                 resultDataArrayNode = (ArrayNode) rootNode;
293                                 
294                                 Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
295                                 JsonNode element = null;
296
297                                 while (elementIterator.hasNext()) {
298                                         element = elementIterator.next();
299
300                                         final String id = NodeUtils.getNodeFieldAsText(element, "id");
301                                         final String type = NodeUtils.getNodeFieldAsText(element, "type");
302                                         final String url = NodeUtils.getNodeFieldAsText(element, "url");
303
304                                         String resourceLink;
305                                         try {
306                                                 resourceLink = gizmoAdapter.getFullInventoryUrl(type + "/" + id);
307                                                 selflinks.add(new SelfLinkDescriptor(NodeUtils.extractRawGizmoPathWithoutVersion(resourceLink), null, type));
308                                         } catch (Exception e) {
309                                           LOG.error(AaiUiMsgs.ERROR_GENERIC, "ERROR:  Failed to determine resource link caused by " + e.getMessage());
310                                         }
311
312                                 }
313                         }
314                 }
315
316         }
317
318   /**
319    * Sync entity types.
320    */
321   private void syncEntityTypes() {
322
323     while (selflinks.peek() != null) {
324
325       SelfLinkDescriptor linkDescriptor = selflinks.poll();
326       aaiWorkOnHand.decrementAndGet();
327
328       OxmEntityDescriptor descriptor = null;
329
330       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
331
332         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
333
334         if (descriptor == null) {
335           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
336           continue;
337         }
338
339         NetworkTransaction txn = new NetworkTransaction();
340         txn.setDescriptor(descriptor);
341         txn.setLink(linkDescriptor.getSelfLink());
342         txn.setOperationType(HttpMethod.GET);
343         txn.setEntityType(linkDescriptor.getEntityType());
344
345         aaiWorkOnHand.incrementAndGet();
346
347         supplyAsync(new PerformGizmoRetrieval(txn, gizmoAdapter), aaiExecutor)
348             .whenComplete((result, error) -> {
349
350               aaiWorkOnHand.decrementAndGet();
351
352               if (error != null) {
353                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
354               } else {
355                 if (result == null) {
356                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
357                       linkDescriptor.getSelfLink());
358                 } else {
359                   updateActiveInventoryCounters(result);
360                   fetchDocumentForUpsert(result);
361                 }
362               }
363             });
364       }
365
366     }
367
368   }
369
370   /**
371    * Perform document upsert.
372    *
373    * @param esGetTxn the es get txn
374    * @param se the se
375    */
376   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SearchableEntity se) {
377     /**
378      * <p>
379      * <ul>
380      * As part of the response processing we need to do the following:
381      * <li>1. Extract the version (if present), it will be the ETAG when we use the
382      * Search-Abstraction-Service
383      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
384      * tag
385      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
386      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
387      * </ul>
388      * </p>
389      */
390     String link = null;
391     try {
392       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
393     } catch (Exception exc) {
394       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
395       return;
396     }
397
398     String versionNumber = null;
399     boolean wasEntryDiscovered = false;
400     if (esGetTxn.getOperationResult().getResultCode() == 404) {
401       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, se.getEntityPrimaryKeyValue());
402     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
403       wasEntryDiscovered = true;
404       try {
405         versionNumber = NodeUtils.extractFieldValueFromObject(
406             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
407             "_version");
408       } catch (IOException exc) {
409         String message =
410             "Error extracting version number from response, aborting searchable entity sync of "
411                 + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
412         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
413         return;
414       }
415     } else {
416       /*
417        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
418        * return.
419        */
420       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
421           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
422       return;
423     }
424
425     try {
426       String jsonPayload = null;
427       if (wasEntryDiscovered) {
428         try {
429           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
430           NodeUtils.extractObjectsByKey(
431               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
432               "_source", sourceObject);
433
434           if (!sourceObject.isEmpty()) {
435             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
436             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
437             ObjectReader updater = mapper.readerForUpdating(me);
438             MergableEntity merged = updater.readValue(NodeUtils.convertObjectToJson(se,false));
439             jsonPayload = mapper.writeValueAsString(merged);
440           }
441         } catch (IOException exc) {
442           String message =
443               "Error extracting source value from response, aborting searchable entity sync of "
444                   + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
445           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
446           return;
447         }
448       } else {
449           jsonPayload = se.getAsJson();
450       }
451
452       if (wasEntryDiscovered) {
453         if (versionNumber != null && jsonPayload != null) {
454
455           String requestPayload = elasticSearchAdapter.buildBulkImportOperationRequest(getIndexName(),
456               "default", se.getId(), versionNumber, jsonPayload);
457
458           NetworkTransaction transactionTracker = new NetworkTransaction();
459           transactionTracker.setEntityType(esGetTxn.getEntityType());
460           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
461           transactionTracker.setOperationType(HttpMethod.PUT);
462
463           esWorkOnHand.incrementAndGet();
464           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
465               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
466                   .whenComplete((result, error) -> {
467
468                     esWorkOnHand.decrementAndGet();
469
470                     if (error != null) {
471                       String message = "Searchable entity sync UPDATE PUT error - "
472                           + error.getLocalizedMessage();
473                       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
474                     } else {
475                       updateElasticSearchCounters(result);
476                       processStoreDocumentResult(result, esGetTxn, se);
477                     }
478                   });
479         }
480
481       } else {
482         
483         if (link != null && jsonPayload != null) {
484
485           NetworkTransaction updateElasticTxn = new NetworkTransaction();
486           updateElasticTxn.setLink(link);
487           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
488           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
489           updateElasticTxn.setOperationType(HttpMethod.PUT);
490
491           esWorkOnHand.incrementAndGet();
492           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
493               esPutExecutor).whenComplete((result, error) -> {
494
495                 esWorkOnHand.decrementAndGet();
496
497                 if (error != null) {
498                   String message =
499                       "Searchable entity sync UPDATE PUT error - " + error.getLocalizedMessage();
500                   LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
501                 } else {
502                   updateElasticSearchCounters(result);
503                   processStoreDocumentResult(result, esGetTxn, se);
504                 }
505               });
506         }
507       }
508     } catch (Exception exc) {
509       String message = "Exception caught during searchable entity sync PUT operation. Message - "
510           + exc.getLocalizedMessage();
511       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
512     }
513   }
514
515   /**
516    * Populate searchable entity document.
517    *
518    * @param doc the doc
519    * @param result the result
520    * @param resultDescriptor the result descriptor
521    * @throws JsonProcessingException the json processing exception
522    * @throws IOException Signals that an I/O exception has occurred.
523    */
524   protected void populateSearchableEntityDocument(SearchableEntity doc, String result,
525       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
526
527     doc.setEntityType(resultDescriptor.getEntityName());
528
529     JsonNode entityNode = mapper.readTree(result);
530     
531     String id = NodeUtils.getNodeFieldAsText(entityNode, "id");
532     String type = NodeUtils.getNodeFieldAsText(entityNode, "type");
533     String url = NodeUtils.getNodeFieldAsText(entityNode, "url");
534     
535     JsonNode properties = entityNode.get("properties");
536     
537     Iterator<String> fieldNames = properties.fieldNames();
538     
539     
540
541     List<String> primaryKeyValues = new ArrayList<String>();
542     String pkeyValue = null;
543
544     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup.getSearchableEntityDescriptors().get(resultDescriptor.getEntityName());
545     
546     for (String keyName : searchableDescriptor.getPrimaryKeyAttributeNames()) {
547       pkeyValue = NodeUtils.getNodeFieldAsText(properties, keyName);
548       if (pkeyValue != null) {
549         primaryKeyValues.add(pkeyValue);
550       } else {
551         String message = "populateSearchableEntityDocument(), pKeyValue is null for entityType = "
552             + resultDescriptor.getEntityName();
553         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
554       }
555     }
556
557     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
558     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
559
560     final List<String> searchTagFields = searchableDescriptor.getSearchableAttributes();
561
562     /*
563      * Based on configuration, use the configured field names for this entity-Type to build a
564      * multi-value collection of search tags for elastic search entity search criteria.
565      */
566     for (String searchTagField : searchTagFields) {
567       String searchTagValue = NodeUtils.getNodeFieldAsText(properties, searchTagField);
568       if (searchTagValue != null && !searchTagValue.isEmpty()) {
569         doc.addSearchTagWithKey(searchTagValue, searchTagField);
570       }
571     }
572   }
573
574   /**
575    * Fetch document for upsert.
576    *
577    * @param txn the txn
578    */
579   private void fetchDocumentForUpsert(NetworkTransaction txn) {
580     if (!txn.getOperationResult().wasSuccessful()) {
581       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
582       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
583       return;
584     }
585
586     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup
587         .getSearchableEntityDescriptors().get(txn.getDescriptor().getEntityName());
588     
589     try {
590       if (searchableDescriptor.hasSearchableAttributes()) {
591
592         final String jsonResult = txn.getOperationResult().getResult();
593         if (jsonResult != null && jsonResult.length() > 0) {
594
595                  SearchableEntity se = new SearchableEntity();
596              se.setLink( txn.getLink() );
597              populateSearchableEntityDocument(se, jsonResult, searchableDescriptor);
598              se.deriveFields(); 
599                 
600
601           String link = null;
602           try {
603             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
604           } catch (Exception exc) {
605             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
606           }
607
608           if (link != null) {
609             NetworkTransaction n2 = new NetworkTransaction();
610             n2.setLink(link);
611             n2.setEntityType(txn.getEntityType());
612             n2.setDescriptor(txn.getDescriptor());
613             n2.setOperationType(HttpMethod.GET);
614
615             esWorkOnHand.incrementAndGet();
616
617             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
618                 .whenComplete((result, error) -> {
619
620                   esWorkOnHand.decrementAndGet();
621
622                   if (error != null) {
623                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
624                   } else {
625                     updateElasticSearchCounters(result);
626                     performDocumentUpsert(result, se);
627                   }
628                 });
629           }
630         }
631
632       } 
633     } catch (JsonProcessingException exc) {
634       LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing error while fetching document for elasticsearch update.  Error: " + exc.getMessage() );
635     } catch (IOException exc) {
636       LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing error while fetching document for elasticsearch update.  Error: " + exc.getMessage() );
637     }
638   }
639
640   /**
641    * Process store document result.
642    *
643    * @param esPutResult the es put result
644    * @param esGetResult the es get result
645    * @param se the se
646    */
647   private void processStoreDocumentResult(NetworkTransaction esPutResult,
648       NetworkTransaction esGetResult, SearchableEntity se) {
649
650     OperationResult or = esPutResult.getOperationResult();
651
652     if (!or.wasSuccessful()) {
653       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
654
655         if (shouldAllowRetry(se.getId())) {
656           esWorkOnHand.incrementAndGet();
657
658           RetrySearchableEntitySyncContainer rsc =
659               new RetrySearchableEntitySyncContainer(esGetResult, se);
660           retryQueue.push(rsc);
661
662           String message = "Store document failed during searchable entity synchronization"
663               + " due to version conflict. Entity will be re-synced.";
664           LOG.warn(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
665         }
666       } else {
667         String message =
668             "Store document failed during searchable entity synchronization with result code "
669                 + or.getResultCode() + " and result message " + or.getResult();
670         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
671       }
672     }
673   }
674
675   /**
676    * Perform retry sync.
677    */
678   private void performRetrySync() {
679     while (retryQueue.peek() != null) {
680
681       RetrySearchableEntitySyncContainer rsc = retryQueue.poll();
682       if (rsc != null) {
683
684         SearchableEntity se = rsc.getSearchableEntity();
685         NetworkTransaction txn = rsc.getNetworkTransaction();
686
687         String link = null;
688         try {
689           /*
690            * In this retry flow the se object has already derived its fields
691            */
692           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), se.getId());
693         } catch (Exception exc) {
694           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
695         }
696
697         if (link != null) {
698           NetworkTransaction retryTransaction = new NetworkTransaction();
699           retryTransaction.setLink(link);
700           retryTransaction.setEntityType(txn.getEntityType());
701           retryTransaction.setDescriptor(txn.getDescriptor());
702           retryTransaction.setOperationType(HttpMethod.GET);
703
704           /*
705            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
706            * called incrementAndGet when queuing the failed PUT!
707            */
708
709           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
710               esExecutor).whenComplete((result, error) -> {
711
712                 esWorkOnHand.decrementAndGet();
713
714                 if (error != null) {
715                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
716                 } else {
717                   updateElasticSearchCounters(result);
718                   performDocumentUpsert(result, se);
719                 }
720               });
721         }
722
723       }
724     }
725   }
726
727   /**
728    * Should allow retry.
729    *
730    * @param id the id
731    * @return true, if successful
732    */
733   private boolean shouldAllowRetry(String id) {
734     boolean isRetryAllowed = true;
735     if (retryLimitTracker.get(id) != null) {
736       Integer currentCount = retryLimitTracker.get(id);
737       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
738         isRetryAllowed = false;
739         String message = "Searchable entity re-sync limit reached for " + id
740             + ", re-sync will no longer be attempted for this entity";
741         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
742       } else {
743         Integer newCount = new Integer(currentCount.intValue() + 1);
744         retryLimitTracker.put(id, newCount);
745       }
746     } else {
747       Integer firstRetryCount = new Integer(1);
748       retryLimitTracker.put(id, firstRetryCount);
749     }
750
751     return isRetryAllowed;
752   }
753
754   @Override
755   public SynchronizerState getState() {
756     if (!isSyncDone()) {
757       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
758     }
759
760     return SynchronizerState.IDLE;
761
762   }
763
764   /* (non-Javadoc)
765    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
766    */
767   @Override
768   public String getStatReport(boolean showFinalReport) {
769     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
770     return this.getStatReport(syncDurationInMs, showFinalReport);
771   }
772
773   /* (non-Javadoc)
774    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
775    */
776   @Override
777   public void shutdown() {
778     this.shutdownExecutors();
779   }
780
781   @Override
782   protected boolean isSyncDone() {
783     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
784
785     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
786       return false;
787     }
788
789     return true;
790   }
791
792 }