Adding interfaces in documentation
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.autosuggestion.sync;
26
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
28
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Deque;
34 import java.util.EnumSet;
35 import java.util.HashMap;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedDeque;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.function.Supplier;
44
45 import org.onap.aai.cl.api.Logger;
46 import org.onap.aai.cl.eelf.LoggerFactory;
47 import org.onap.aai.cl.mdc.MdcContext;
48 import org.onap.aai.restclient.client.OperationResult;
49 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
50 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
51 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
52 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
53 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
54 import org.onap.aai.sparky.dal.NetworkTransaction;
55 import org.onap.aai.sparky.dal.rest.HttpMethod;
56 import org.onap.aai.sparky.logging.AaiUiMsgs;
57 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
58 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
59 import org.onap.aai.sparky.sync.IndexSynchronizer;
60 import org.onap.aai.sparky.sync.SynchronizerConstants;
61 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
62 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
63 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
64 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
65 import org.onap.aai.sparky.sync.enumeration.OperationState;
66 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
67 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
68 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
69 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
70 import org.onap.aai.sparky.util.NodeUtils;
71 import org.onap.aai.sparky.util.SuggestionsPermutation;
72 import org.slf4j.MDC;
73
74 import com.fasterxml.jackson.core.JsonProcessingException;
75 import com.fasterxml.jackson.databind.JsonNode;
76 import com.fasterxml.jackson.databind.node.ArrayNode;
77
78 /**
79  * The Class AutosuggestionSynchronizer.
80  */
81 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
82     implements IndexSynchronizer {
83
84   private class RetrySuggestionEntitySyncContainer {
85     NetworkTransaction txn;
86     SuggestionSearchEntity ssec;
87
88     /**
89      * Instantiates a new RetrySuggestionEntitySyncContainer.
90      *
91      * @param txn the txn
92      * @param icer the icer
93      */
94     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
95       this.txn = txn;
96       this.ssec = icer;
97     }
98
99     public NetworkTransaction getNetworkTransaction() {
100       return txn;
101     }
102
103     public SuggestionSearchEntity getSuggestionSearchEntity() {
104       return ssec;
105     }
106   }
107
108   private static final Logger LOG =
109       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
110   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
111
112   private boolean allWorkEnumerated;
113   private Deque<SelfLinkDescriptor> selflinks;
114   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
115   private boolean syncInProgress;
116   private Map<String, String> contextMap;
117   protected ExecutorService esPutExecutor;
118   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
119   private Map<String, Integer> retryLimitTracker;
120   private OxmEntityLookup oxmEntityLookup;
121   private SuggestionEntityLookup suggestionEntityLookup;
122   private FiltersConfig filtersConfig;
123
124   /**
125    * Instantiates a new historical entity summarizer.
126    *
127    * @param indexName the index name
128    * @throws Exception the exception
129    */
130   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
131       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
132       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
133       SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig) throws Exception {
134
135     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
136         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
137     
138     this.oxmEntityLookup = oxmEntityLookup;
139     this.suggestionEntityLookup = suggestionEntityLookup;
140     this.allWorkEnumerated = false;
141     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
142     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
143     this.synchronizerName = "Autosuggestion Entity Synchronizer";
144     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
145     this.syncInProgress = false;
146     this.contextMap = MDC.getCopyOfContextMap();
147     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
148     this.retryQueue = new ConcurrentLinkedDeque<RetrySuggestionEntitySyncContainer>();
149     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
150     this.syncDurationInMs = -1;
151     this.filtersConfig = filtersConfig;
152   }
153
154   /**
155    * Collect all the work.
156    *
157    * @return the operation state
158    */
159   private OperationState collectAllTheWork() {
160     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
161     Map<String, SuggestionEntityDescriptor> descriptorMap =
162         suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
163
164     if (descriptorMap.isEmpty()) {
165       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
166       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
167       return OperationState.ERROR;
168     }
169
170     Collection<String> syncTypes = descriptorMap.keySet();
171
172     try {
173
174       /*
175        * launch a parallel async thread to process the documents for each entity-type (to max the of
176        * the configured executor anyway)
177        */
178
179       aaiWorkOnHand.set(syncTypes.size());
180
181       for (String key : syncTypes) {
182
183         supplyAsync(new Supplier<Void>() {
184
185           @Override
186           public Void get() {
187             MDC.setContextMap(contextMap);
188             OperationResult typeLinksResult = null;
189             try {
190               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
191               aaiWorkOnHand.decrementAndGet();
192               processEntityTypeSelfLinks(typeLinksResult);
193             } catch (Exception exc) {
194               LOG.error(AaiUiMsgs.ERROR_GENERIC,
195                   "An error occurred while processing entity self-links. Error: "
196                       + exc.getMessage());
197             }
198
199             return null;
200           }
201
202         }, aaiExecutor).whenComplete((result, error) -> {
203
204           if (error != null) {
205             LOG.error(AaiUiMsgs.ERROR_GENERIC,
206                 "An error occurred getting data from AAI. Error = " + error.getMessage());
207           }
208         });
209
210       }
211
212       while (aaiWorkOnHand.get() != 0) {
213
214         if (LOG.isDebugEnabled()) {
215           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
216         }
217
218         Thread.sleep(1000);
219       }
220
221       aaiWorkOnHand.set(selflinks.size());
222       allWorkEnumerated = true;
223       syncEntityTypes();
224
225       while (!isSyncDone()) {
226         performRetrySync();
227         Thread.sleep(1000);
228       }
229
230       /*
231        * Make sure we don't hang on to retries that failed which could cause issues during future
232        * syncs
233        */
234       retryLimitTracker.clear();
235
236     } catch (Exception exc) {
237       LOG.error(AaiUiMsgs.ERROR_GENERIC,
238           "An error occurred while performing the sync.  Error: " + exc.getMessage());
239     }
240
241     return OperationState.OK;
242
243   }
244
245   /*
246    * (non-Javadoc)
247    * 
248    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
249    */
250   @Override
251   public OperationState doSync() {
252     this.syncDurationInMs = -1;
253     syncStartedTimeStampInMs = System.currentTimeMillis();
254     String txnID = NodeUtils.getRandomTxnId();
255     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
256
257     return collectAllTheWork();
258   }
259
260   /**
261    * Process entity type self links.
262    *
263    * @param operationResult the operation result
264    */
265   private void processEntityTypeSelfLinks(OperationResult operationResult) {
266
267     JsonNode rootNode = null;
268     
269     if ( operationResult == null ) {
270         return;
271     }
272
273     final String jsonResult = operationResult.getResult();
274
275     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
276
277       try {
278         rootNode = mapper.readTree(jsonResult);
279       } catch (IOException exc) {
280         String message = "Could not deserialize JSON (representing operation result) as node tree. "
281             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
282         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
283       }
284
285       JsonNode resultData = rootNode.get("result-data");
286       ArrayNode resultDataArrayNode = null;
287
288       if (resultData.isArray()) {
289         resultDataArrayNode = (ArrayNode) resultData;
290
291         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
292         JsonNode element = null;
293
294         while (elementIterator.hasNext()) {
295           element = elementIterator.next();
296
297           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
298           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
299
300           OxmEntityDescriptor descriptor = null;
301
302           if (resourceType != null && resourceLink != null) {
303
304             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
305
306             if (descriptor == null) {
307               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
308               // go to next element in iterator
309               continue;
310             }
311             selflinks.add(new SelfLinkDescriptor(resourceLink,
312                 SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
313
314
315           }
316         }
317       }
318     }
319   }
320
321   /**
322    * Sync entity types.
323    */
324   private void syncEntityTypes() {
325
326     while (selflinks.peek() != null) {
327
328       SelfLinkDescriptor linkDescriptor = selflinks.poll();
329       aaiWorkOnHand.decrementAndGet();
330
331       OxmEntityDescriptor descriptor = null;
332
333       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
334
335         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
336
337         if (descriptor == null) {
338           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
339           // go to next element in iterator
340           continue;
341         }
342
343         NetworkTransaction txn = new NetworkTransaction();
344         txn.setDescriptor(descriptor);
345         txn.setLink(linkDescriptor.getSelfLink());
346         txn.setOperationType(HttpMethod.GET);
347         txn.setEntityType(linkDescriptor.getEntityType());
348
349         aaiWorkOnHand.incrementAndGet();
350
351         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
352             .whenComplete((result, error) -> {
353
354               aaiWorkOnHand.decrementAndGet();
355
356               if (error != null) {
357                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
358               } else {
359                 if (result == null) {
360                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
361                       linkDescriptor.getSelfLink());
362                 } else {
363                   updateActiveInventoryCounters(result);
364                   fetchDocumentForUpsert(result);
365                 }
366               }
367             });
368       }
369
370     }
371
372   }
373
374   /*
375    * Return a set of valid suggestion attributes for the provided entityName that are present in the
376    * JSON
377    * 
378    * @param node JSON node in which the attributes should be found
379    * 
380    * @param entityName Name of the entity
381    * 
382    * @return List of all valid suggestion attributes(key's)
383    */
384   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
385     List<String> suggestableAttr = new ArrayList<String>();
386
387     HashMap<String, String> desc =
388         suggestionEntityLookup.getSuggestionSearchEntityOxmModel().get(entityName);
389
390     if (desc != null) {
391
392       String attr = desc.get("suggestibleAttributes");
393
394       if (attr != null) {
395         suggestableAttr = Arrays.asList(attr.split(","));
396         List<String> suggestableValue = new ArrayList<String>();
397         for (String attribute : suggestableAttr) {
398           if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
399             suggestableValue.add(attribute);
400           }
401         }
402         return suggestableValue;
403       }
404     }
405
406     return new ArrayList<String>();
407   }
408
409   /**
410    * Fetch all the documents for upsert. Based on the number of permutations that are available the
411    * number of documents will be different
412    *
413    * @param txn the txn
414    */
415   private void fetchDocumentForUpsert(NetworkTransaction txn) {
416     if (!txn.getOperationResult().wasSuccessful()) {
417       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
418       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
419       return;
420     }
421     try {
422       final String jsonResult = txn.getOperationResult().getResult();
423
424       if (jsonResult != null && jsonResult.length() > 0) {
425
426         // Step 1: Calculate the number of possible permutations of attributes
427         String entityName = txn.getDescriptor().getEntityName();
428         JsonNode entityNode = mapper.readTree(jsonResult);
429
430         List<String> availableSuggestableAttrName =
431             getSuggestableAttrNamesFromReponse(entityNode, entityName);
432         
433         ArrayList<ArrayList<String>> uniqueLists =
434             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
435         // Now we have a list of all possible permutations for the status that are
436         // defined for this entity type. Try inserting a document for every combination.
437         for (ArrayList<String> uniqueList : uniqueLists) {
438
439           SuggestionSearchEntity sse = new SuggestionSearchEntity(filtersConfig, suggestionEntityLookup);
440           sse.setSuggestableAttr(uniqueList);
441           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
442           sse.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
443           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
444           // The unique id for the document will be created at derive fields
445           sse.deriveFields();
446           // Insert the document only if it has valid statuses
447           if (sse.isSuggestableDoc()) {
448             String link = null;
449             try {
450               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
451             } catch (Exception exc) {
452               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
453             }
454
455             if (link != null) {
456               NetworkTransaction n2 = new NetworkTransaction();
457               n2.setLink(link);
458               n2.setEntityType(txn.getEntityType());
459               n2.setDescriptor(txn.getDescriptor());
460               n2.setOperationType(HttpMethod.GET);
461
462               esWorkOnHand.incrementAndGet();
463
464               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
465                   .whenComplete((result, error) -> {
466
467                     esWorkOnHand.decrementAndGet();
468
469                     if (error != null) {
470                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
471                     } else {
472                       updateElasticSearchCounters(result);
473                       performDocumentUpsert(result, sse);
474                     }
475                   });
476             }
477           }
478         }
479       }
480     } catch (JsonProcessingException exc) {
481         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a json processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
482     } catch (IOException exc) {
483         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a io processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
484     }
485   }
486
487   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
488       NetworkTransaction txn) throws JsonProcessingException, IOException {
489
490     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
491
492     sse.setEntityType(resultDescriptor.getEntityName());
493
494     JsonNode entityNode = mapper.readTree(result);
495
496     List<String> primaryKeyValues = new ArrayList<String>();
497     String pkeyValue = null;
498
499     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
500       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
501       if (pkeyValue != null) {
502         primaryKeyValues.add(pkeyValue);
503       } else {
504         String message = "populateSuggestionSearchEntityDocument(),"
505             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
506         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
507       }
508     }
509
510     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
511     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
512     sse.generateSuggestionInputPermutations();
513   }
514
515   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
516     /**
517      * <p>
518      * <ul>
519      * As part of the response processing we need to do the following:
520      * <li>1. Extract the version (if present), it will be the ETAG when we use the
521      * Search-Abstraction-Service
522      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
523      * tag
524      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
525      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
526      * </ul>
527      * </p>
528      */
529     String link = null;
530     try {
531       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
532     } catch (Exception exc) {
533       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
534       return;
535     }
536
537     boolean wasEntryDiscovered = false;
538     if (esGetTxn.getOperationResult().getResultCode() == 404) {
539       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
540     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
541       wasEntryDiscovered = true;
542     } else {
543       /*
544        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
545        * found TODO -> Should we return.
546        */
547       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
548           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
549       return;
550     }
551     // Insert a new document only if the paylod is different.
552     // This is determined by hashing the payload and using it as a id for the document
553     //
554     if (!wasEntryDiscovered) {
555       try {
556         String jsonPayload = null;
557
558         jsonPayload = sse.getAsJson();
559         if (link != null && jsonPayload != null) {
560
561           NetworkTransaction updateElasticTxn = new NetworkTransaction();
562           updateElasticTxn.setLink(link);
563           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
564           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
565           updateElasticTxn.setOperationType(HttpMethod.PUT);
566
567           esWorkOnHand.incrementAndGet();
568           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
569               esPutExecutor).whenComplete((result, error) -> {
570
571                 esWorkOnHand.decrementAndGet();
572
573                 if (error != null) {
574                   String message = "Suggestion search entity sync UPDATE PUT error - "
575                       + error.getLocalizedMessage();
576                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
577                 } else {
578                   updateElasticSearchCounters(result);
579                   processStoreDocumentResult(result, esGetTxn, sse);
580                 }
581               });
582         }
583       } catch (Exception exc) {
584         String message =
585             "Exception caught during suggestion search entity sync PUT operation. Message - "
586                 + exc.getLocalizedMessage();
587         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
588       }
589     }
590   }
591
592   private void processStoreDocumentResult(NetworkTransaction esPutResult,
593       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
594
595     OperationResult or = esPutResult.getOperationResult();
596
597     if (!or.wasSuccessful()) {
598       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
599
600         if (shouldAllowRetry(sse.getId())) {
601           esWorkOnHand.incrementAndGet();
602
603           RetrySuggestionEntitySyncContainer rssec =
604               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
605           retryQueue.push(rssec);
606
607           String message = "Store document failed during suggestion search entity synchronization"
608               + " due to version conflict. Entity will be re-synced.";
609           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
610         }
611       } else {
612         String message =
613             "Store document failed during suggestion search entity synchronization with result code "
614                 + or.getResultCode() + " and result message " + or.getResult();
615         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
616       }
617     }
618   }
619
620   /**
621    * Perform retry sync.
622    */
623   private void performRetrySync() {
624     while (retryQueue.peek() != null) {
625
626       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
627       if (susc != null) {
628
629         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
630         NetworkTransaction txn = susc.getNetworkTransaction();
631
632         String link = null;
633         try {
634           /*
635            * In this retry flow the se object has already derived its fields
636            */
637           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
638         } catch (Exception exc) {
639           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
640         }
641
642         if (link != null) {
643           NetworkTransaction retryTransaction = new NetworkTransaction();
644           retryTransaction.setLink(link);
645           retryTransaction.setEntityType(txn.getEntityType());
646           retryTransaction.setDescriptor(txn.getDescriptor());
647           retryTransaction.setOperationType(HttpMethod.GET);
648
649           /*
650            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
651            * called incrementAndGet when queuing the failed PUT!
652            */
653
654           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
655               esExecutor).whenComplete((result, error) -> {
656
657                 esWorkOnHand.decrementAndGet();
658
659                 if (error != null) {
660                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
661                 } else {
662                   updateElasticSearchCounters(result);
663                   performDocumentUpsert(result, sus);
664                 }
665               });
666         }
667
668       }
669     }
670   }
671
672   /**
673    * Should allow retry.
674    *
675    * @param id the id
676    * @return true, if successful
677    */
678   private boolean shouldAllowRetry(String id) {
679     boolean isRetryAllowed = true;
680     if (retryLimitTracker.get(id) != null) {
681       Integer currentCount = retryLimitTracker.get(id);
682       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
683         isRetryAllowed = false;
684         String message = "Searchable entity re-sync limit reached for " + id
685             + ", re-sync will no longer be attempted for this entity";
686         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
687       } else {
688         Integer newCount = new Integer(currentCount.intValue() + 1);
689         retryLimitTracker.put(id, newCount);
690       }
691     } else {
692       Integer firstRetryCount = new Integer(1);
693       retryLimitTracker.put(id, firstRetryCount);
694     }
695
696     return isRetryAllowed;
697   }
698
699
700
701   @Override
702   public SynchronizerState getState() {
703
704     if (!isSyncDone()) {
705       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
706     }
707
708     return SynchronizerState.IDLE;
709
710   }
711
712   /*
713    * (non-Javadoc)
714    * 
715    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
716    */
717   @Override
718   public String getStatReport(boolean showFinalReport) {
719     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
720     return getStatReport(syncDurationInMs, showFinalReport);
721   }
722
723   /*
724    * (non-Javadoc)
725    * 
726    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
727    */
728   @Override
729   public void shutdown() {
730     this.shutdownExecutors();
731   }
732
733   @Override
734   protected boolean isSyncDone() {
735
736     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
737
738     if (LOG.isDebugEnabled()) {
739       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
740           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
741     }
742
743     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
744       return false;
745     }
746
747     this.syncInProgress = false;
748
749     return true;
750   }
751
752   /*
753    * (non-Javadoc)
754    * 
755    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
756    */
757   @Override
758   public void clearCache() {
759
760     if (syncInProgress) {
761       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
762           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
763       return;
764     }
765
766     super.clearCache();
767     this.resetCounters();
768     if (entityCounters != null) {
769       entityCounters.clear();
770     }
771
772     allWorkEnumerated = false;
773
774   }
775
776 }