Fix potential null pointer places
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.autosuggestion.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.Deque;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedDeque;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Supplier;
40
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.restclient.client.OperationResult;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
48 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
49 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
50 import org.onap.aai.sparky.dal.NetworkTransaction;
51 import org.onap.aai.sparky.dal.rest.HttpMethod;
52 import org.onap.aai.sparky.logging.AaiUiMsgs;
53 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
54 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
55 import org.onap.aai.sparky.sync.IndexSynchronizer;
56 import org.onap.aai.sparky.sync.SynchronizerConstants;
57 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
58 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
59 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
60 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
61 import org.onap.aai.sparky.sync.enumeration.OperationState;
62 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
63 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
64 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
65 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
66 import org.onap.aai.sparky.util.NodeUtils;
67 import org.onap.aai.sparky.util.SuggestionsPermutation;
68 import org.slf4j.MDC;
69
70 import com.fasterxml.jackson.core.JsonProcessingException;
71 import com.fasterxml.jackson.databind.JsonNode;
72 import com.fasterxml.jackson.databind.node.ArrayNode;
73
74 /**
75  * The Class AutosuggestionSynchronizer.
76  */
77 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
78     implements IndexSynchronizer {
79
80   private class RetrySuggestionEntitySyncContainer {
81     NetworkTransaction txn;
82     SuggestionSearchEntity ssec;
83
84     /**
85      * Instantiates a new RetrySuggestionEntitySyncContainer.
86      *
87      * @param txn the txn
88      * @param icer the icer
89      */
90     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
91       this.txn = txn;
92       this.ssec = icer;
93     }
94
95     public NetworkTransaction getNetworkTransaction() {
96       return txn;
97     }
98
99     public SuggestionSearchEntity getSuggestionSearchEntity() {
100       return ssec;
101     }
102   }
103
104   private static final Logger LOG =
105       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
106   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
111   private boolean syncInProgress;
112   private Map<String, String> contextMap;
113   protected ExecutorService esPutExecutor;
114   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
115   private Map<String, Integer> retryLimitTracker;
116   private OxmEntityLookup oxmEntityLookup;
117   private SuggestionEntityLookup suggestionEntityLookup;
118   private FiltersConfig filtersConfig;
119
120   /**
121    * Instantiates a new historical entity summarizer.
122    *
123    * @throws Exception the exception
124    */
125   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
126       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
127       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
128       SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig) throws Exception {
129
130     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
131         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
132     
133     this.oxmEntityLookup = oxmEntityLookup;
134     this.suggestionEntityLookup = suggestionEntityLookup;
135     this.allWorkEnumerated = false;
136     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
137     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
138     this.synchronizerName = "Autosuggestion Entity Synchronizer";
139     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
140     this.syncInProgress = false;
141     this.contextMap = MDC.getCopyOfContextMap();
142     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
143     this.retryQueue = new ConcurrentLinkedDeque<RetrySuggestionEntitySyncContainer>();
144     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
145     this.syncDurationInMs = -1;
146     this.filtersConfig = filtersConfig;
147   }
148
149   /**
150    * Collect all the work.
151    *
152    * @return the operation state
153    */
154   private OperationState collectAllTheWork() {
155     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
156     Map<String, SuggestionEntityDescriptor> descriptorMap =
157         suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
158
159     if (descriptorMap.isEmpty()) {
160       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
161       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
162       return OperationState.ERROR;
163     }
164
165     Collection<String> syncTypes = descriptorMap.keySet();
166
167     try {
168
169       /*
170        * launch a parallel async thread to process the documents for each entity-type (to max the of
171        * the configured executor anyway)
172        */
173
174       aaiWorkOnHand.set(syncTypes.size());
175
176       for (String key : syncTypes) {
177
178         supplyAsync(new Supplier<Void>() {
179
180           @Override
181           public Void get() {
182             MDC.setContextMap(contextMap);
183             OperationResult typeLinksResult = null;
184             try {
185               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
186               aaiWorkOnHand.decrementAndGet();
187               processEntityTypeSelfLinks(typeLinksResult);
188             } catch (Exception exc) {
189               LOG.error(AaiUiMsgs.ERROR_GENERIC,
190                   "An error occurred while processing entity self-links. Error: "
191                       + exc.getMessage());
192             }
193
194             return null;
195           }
196
197         }, aaiExecutor).whenComplete((result, error) -> {
198
199           if (error != null) {
200             LOG.error(AaiUiMsgs.ERROR_GENERIC,
201                 "An error occurred getting data from AAI. Error = " + error.getMessage());
202           }
203         });
204
205       }
206
207       while (aaiWorkOnHand.get() != 0) {
208
209         if (LOG.isDebugEnabled()) {
210           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
211         }
212
213         Thread.sleep(1000);
214       }
215
216       aaiWorkOnHand.set(selflinks.size());
217       allWorkEnumerated = true;
218       syncEntityTypes();
219
220       while (!isSyncDone()) {
221         performRetrySync();
222         Thread.sleep(1000);
223       }
224
225       /*
226        * Make sure we don't hang on to retries that failed which could cause issues during future
227        * syncs
228        */
229       retryLimitTracker.clear();
230
231     } catch (Exception exc) {
232       LOG.error(AaiUiMsgs.ERROR_GENERIC,
233           "An error occurred while performing the sync.  Error: " + exc.getMessage());
234     }
235
236     return OperationState.OK;
237
238   }
239
240   /*
241    * (non-Javadoc)
242    * 
243    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
244    */
245   @Override
246   public OperationState doSync() {
247     this.syncDurationInMs = -1;
248     syncStartedTimeStampInMs = System.currentTimeMillis();
249     String txnID = NodeUtils.getRandomTxnId();
250     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
251
252     return collectAllTheWork();
253   }
254
255   /**
256    * Process entity type self links.
257    *
258    * @param operationResult the operation result
259    */
260   private void processEntityTypeSelfLinks(OperationResult operationResult) {
261
262     if (operationResult == null) {
263       return;
264     }
265
266     final String jsonResult = operationResult.getResult();
267
268     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
269
270       try {
271         JsonNode rootNode = mapper.readTree(jsonResult);
272         JsonNode resultData = rootNode.get("result-data");
273
274         if (resultData.isArray()) {
275           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
276
277           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
278
279           while (elementIterator.hasNext()) {
280             JsonNode element = elementIterator.next();
281
282             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
283             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
284
285             if (resourceType != null && resourceLink != null) {
286
287               OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
288
289               if (descriptor == null) {
290                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
291                 // go to next element in iterator
292                 continue;
293               }
294               selflinks.add(new SelfLinkDescriptor(resourceLink,
295                       SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
296             }
297           }
298         }
299       } catch (IOException exc) {
300         String message = "Could not deserialize JSON (representing operation result) as node tree. "
301                 + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
302         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
303       }
304     }
305   }
306
307   /**
308    * Sync entity types.
309    */
310   private void syncEntityTypes() {
311
312     while (selflinks.peek() != null) {
313
314       SelfLinkDescriptor linkDescriptor = selflinks.poll();
315       aaiWorkOnHand.decrementAndGet();
316
317       OxmEntityDescriptor descriptor = null;
318
319       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
320
321         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
322
323         if (descriptor == null) {
324           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
325           // go to next element in iterator
326           continue;
327         }
328
329         NetworkTransaction txn = new NetworkTransaction();
330         txn.setDescriptor(descriptor);
331         txn.setLink(linkDescriptor.getSelfLink());
332         txn.setOperationType(HttpMethod.GET);
333         txn.setEntityType(linkDescriptor.getEntityType());
334
335         aaiWorkOnHand.incrementAndGet();
336
337         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
338             .whenComplete((result, error) -> {
339
340               aaiWorkOnHand.decrementAndGet();
341
342               if (error != null) {
343                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
344               } else {
345                 if (result == null) {
346                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
347                       linkDescriptor.getSelfLink());
348                 } else {
349                   updateActiveInventoryCounters(result);
350                   fetchDocumentForUpsert(result);
351                 }
352               }
353             });
354       }
355
356     }
357
358   }
359
360   /*
361    * Return a set of valid suggestion attributes for the provided entityName that are present in the
362    * JSON
363    * 
364    * @param node JSON node in which the attributes should be found
365    * 
366    * @param entityName Name of the entity
367    * 
368    * @return List of all valid suggestion attributes(key's)
369    */
370   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
371     List<String> suggestableAttr = new ArrayList<String>();
372
373     HashMap<String, String> desc =
374         suggestionEntityLookup.getSuggestionSearchEntityOxmModel().get(entityName);
375
376     if (desc != null) {
377
378       String attr = desc.get("suggestibleAttributes");
379
380       if (attr != null) {
381         suggestableAttr = Arrays.asList(attr.split(","));
382         List<String> suggestableValue = new ArrayList<>();
383         for (String attribute : suggestableAttr) {
384           if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
385             suggestableValue.add(attribute);
386           }
387         }
388         return suggestableValue;
389       }
390     }
391
392     return new ArrayList<>();
393   }
394
395   /**
396    * Fetch all the documents for upsert. Based on the number of permutations that are available the
397    * number of documents will be different
398    *
399    * @param txn the txn
400    */
401   private void fetchDocumentForUpsert(NetworkTransaction txn) {
402     if (!txn.getOperationResult().wasSuccessful()) {
403       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
404       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
405       return;
406     }
407     try {
408       final String jsonResult = txn.getOperationResult().getResult();
409
410       if (jsonResult != null && jsonResult.length() > 0) {
411
412         // Step 1: Calculate the number of possible permutations of attributes
413         String entityName = txn.getDescriptor().getEntityName();
414         JsonNode entityNode = mapper.readTree(jsonResult);
415
416         List<String> availableSuggestableAttrName =
417             getSuggestableAttrNamesFromReponse(entityNode, entityName);
418         
419         ArrayList<ArrayList<String>> uniqueLists =
420             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
421         // Now we have a list of all possible permutations for the status that are
422         // defined for this entity type. Try inserting a document for every combination.
423         for (ArrayList<String> uniqueList : uniqueLists) {
424
425           SuggestionSearchEntity sse = new SuggestionSearchEntity(filtersConfig, suggestionEntityLookup);
426           sse.setSuggestableAttr(uniqueList);
427           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
428           sse.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
429           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
430           // The unique id for the document will be created at derive fields
431           sse.deriveFields();
432           // Insert the document only if it has valid statuses
433           if (sse.isSuggestableDoc()) {
434             String link = null;
435             try {
436               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
437             } catch (Exception exc) {
438               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
439             }
440
441             if (link != null) {
442               NetworkTransaction n2 = new NetworkTransaction();
443               n2.setLink(link);
444               n2.setEntityType(txn.getEntityType());
445               n2.setDescriptor(txn.getDescriptor());
446               n2.setOperationType(HttpMethod.GET);
447
448               esWorkOnHand.incrementAndGet();
449
450               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
451                   .whenComplete((result, error) -> {
452
453                     esWorkOnHand.decrementAndGet();
454
455                     if (error != null) {
456                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
457                     } else {
458                       updateElasticSearchCounters(result);
459                       performDocumentUpsert(result, sse);
460                     }
461                   });
462             }
463           }
464         }
465       }
466     } catch (JsonProcessingException exc) {
467         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a json processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
468     } catch (IOException exc) {
469         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a io processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
470     }
471   }
472
473   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
474       NetworkTransaction txn) throws JsonProcessingException, IOException {
475
476     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
477
478     sse.setEntityType(resultDescriptor.getEntityName());
479
480     JsonNode entityNode = mapper.readTree(result);
481
482     List<String> primaryKeyValues = new ArrayList<String>();
483     String pkeyValue = null;
484
485     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
486       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
487       if (pkeyValue != null) {
488         primaryKeyValues.add(pkeyValue);
489       } else {
490         String message = "populateSuggestionSearchEntityDocument(),"
491             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
492         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
493       }
494     }
495
496     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
497     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
498     sse.generateSuggestionInputPermutations();
499   }
500
501   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
502     /**
503      * <p>
504      * <ul>
505      * As part of the response processing we need to do the following:
506      * <li>1. Extract the version (if present), it will be the ETAG when we use the
507      * Search-Abstraction-Service
508      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
509      * tag
510      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
511      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
512      * </ul>
513      * </p>
514      */
515     String link = null;
516     try {
517       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
518     } catch (Exception exc) {
519       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
520       return;
521     }
522
523     boolean wasEntryDiscovered = false;
524     if (esGetTxn.getOperationResult().getResultCode() == 404) {
525       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
526     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
527       wasEntryDiscovered = true;
528     } else {
529       /*
530        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
531        * found TODO -> Should we return.
532        */
533       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
534           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
535       return;
536     }
537     // Insert a new document only if the paylod is different.
538     // This is determined by hashing the payload and using it as a id for the document
539     //
540     if (!wasEntryDiscovered) {
541       try {
542         String jsonPayload = null;
543
544         jsonPayload = sse.getAsJson();
545         if (link != null && jsonPayload != null) {
546
547           NetworkTransaction updateElasticTxn = new NetworkTransaction();
548           updateElasticTxn.setLink(link);
549           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
550           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
551           updateElasticTxn.setOperationType(HttpMethod.PUT);
552
553           esWorkOnHand.incrementAndGet();
554           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
555               esPutExecutor).whenComplete((result, error) -> {
556
557                 esWorkOnHand.decrementAndGet();
558
559                 if (error != null) {
560                   String message = "Suggestion search entity sync UPDATE PUT error - "
561                       + error.getLocalizedMessage();
562                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
563                 } else {
564                   updateElasticSearchCounters(result);
565                   processStoreDocumentResult(result, esGetTxn, sse);
566                 }
567               });
568         }
569       } catch (Exception exc) {
570         String message =
571             "Exception caught during suggestion search entity sync PUT operation. Message - "
572                 + exc.getLocalizedMessage();
573         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
574       }
575     }
576   }
577
578   private void processStoreDocumentResult(NetworkTransaction esPutResult,
579       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
580
581     OperationResult or = esPutResult.getOperationResult();
582
583     if (!or.wasSuccessful()) {
584       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
585
586         if (shouldAllowRetry(sse.getId())) {
587           esWorkOnHand.incrementAndGet();
588
589           RetrySuggestionEntitySyncContainer rssec =
590               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
591           retryQueue.push(rssec);
592
593           String message = "Store document failed during suggestion search entity synchronization"
594               + " due to version conflict. Entity will be re-synced.";
595           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
596         }
597       } else {
598         String message =
599             "Store document failed during suggestion search entity synchronization with result code "
600                 + or.getResultCode() + " and result message " + or.getResult();
601         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
602       }
603     }
604   }
605
606   /**
607    * Perform retry sync.
608    */
609   private void performRetrySync() {
610     while (retryQueue.peek() != null) {
611
612       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
613       if (susc != null) {
614
615         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
616         NetworkTransaction txn = susc.getNetworkTransaction();
617
618         String link = null;
619         try {
620           /*
621            * In this retry flow the se object has already derived its fields
622            */
623           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
624         } catch (Exception exc) {
625           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
626         }
627
628         if (link != null) {
629           NetworkTransaction retryTransaction = new NetworkTransaction();
630           retryTransaction.setLink(link);
631           retryTransaction.setEntityType(txn.getEntityType());
632           retryTransaction.setDescriptor(txn.getDescriptor());
633           retryTransaction.setOperationType(HttpMethod.GET);
634
635           /*
636            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
637            * called incrementAndGet when queuing the failed PUT!
638            */
639
640           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
641               esExecutor).whenComplete((result, error) -> {
642
643                 esWorkOnHand.decrementAndGet();
644
645                 if (error != null) {
646                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
647                 } else {
648                   updateElasticSearchCounters(result);
649                   performDocumentUpsert(result, sus);
650                 }
651               });
652         }
653
654       }
655     }
656   }
657
658   /**
659    * Should allow retry.
660    *
661    * @param id the id
662    * @return true, if successful
663    */
664   private boolean shouldAllowRetry(String id) {
665     boolean isRetryAllowed = true;
666     if (retryLimitTracker.get(id) != null) {
667       Integer currentCount = retryLimitTracker.get(id);
668       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
669         isRetryAllowed = false;
670         String message = "Searchable entity re-sync limit reached for " + id
671             + ", re-sync will no longer be attempted for this entity";
672         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
673       } else {
674         Integer newCount = new Integer(currentCount.intValue() + 1);
675         retryLimitTracker.put(id, newCount);
676       }
677     } else {
678       Integer firstRetryCount = new Integer(1);
679       retryLimitTracker.put(id, firstRetryCount);
680     }
681
682     return isRetryAllowed;
683   }
684
685
686
687   @Override
688   public SynchronizerState getState() {
689
690     if (!isSyncDone()) {
691       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
692     }
693
694     return SynchronizerState.IDLE;
695
696   }
697
698   /*
699    * (non-Javadoc)
700    * 
701    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
702    */
703   @Override
704   public String getStatReport(boolean showFinalReport) {
705     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
706     return getStatReport(syncDurationInMs, showFinalReport);
707   }
708
709   /*
710    * (non-Javadoc)
711    * 
712    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
713    */
714   @Override
715   public void shutdown() {
716     this.shutdownExecutors();
717   }
718
719   @Override
720   protected boolean isSyncDone() {
721
722     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
723
724     if (LOG.isDebugEnabled()) {
725       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
726           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
727     }
728
729     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
730       return false;
731     }
732
733     this.syncInProgress = false;
734
735     return true;
736   }
737
738   /*
739    * (non-Javadoc)
740    * 
741    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
742    */
743   @Override
744   public void clearCache() {
745
746     if (syncInProgress) {
747       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
748           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
749       return;
750     }
751
752     super.clearCache();
753     this.resetCounters();
754     if (entityCounters != null) {
755       entityCounters.clear();
756     }
757
758     allWorkEnumerated = false;
759
760   }
761
762 }