aad7bcb83a655d48c9658ec2517f95a11210ad73
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.autosuggestion.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.Deque;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedDeque;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Supplier;
40
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.restclient.client.OperationResult;
45 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
46 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
47 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
48 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
49 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
50 import org.onap.aai.sparky.dal.NetworkTransaction;
51 import org.onap.aai.sparky.dal.rest.HttpMethod;
52 import org.onap.aai.sparky.logging.AaiUiMsgs;
53 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
54 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
55 import org.onap.aai.sparky.sync.IndexSynchronizer;
56 import org.onap.aai.sparky.sync.SynchronizerConstants;
57 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
58 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
59 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
60 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
61 import org.onap.aai.sparky.sync.enumeration.OperationState;
62 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
63 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
64 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
65 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
66 import org.onap.aai.sparky.util.NodeUtils;
67 import org.onap.aai.sparky.util.SuggestionsPermutation;
68 import org.slf4j.MDC;
69
70 import com.fasterxml.jackson.core.JsonProcessingException;
71 import com.fasterxml.jackson.databind.JsonNode;
72 import com.fasterxml.jackson.databind.node.ArrayNode;
73
74 /**
75  * The Class AutosuggestionSynchronizer.
76  */
77 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
78     implements IndexSynchronizer {
79
80   private class RetrySuggestionEntitySyncContainer {
81     NetworkTransaction txn;
82     SuggestionSearchEntity ssec;
83
84     /**
85      * Instantiates a new RetrySuggestionEntitySyncContainer.
86      *
87      * @param txn the txn
88      * @param icer the icer
89      */
90     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
91       this.txn = txn;
92       this.ssec = icer;
93     }
94
95     public NetworkTransaction getNetworkTransaction() {
96       return txn;
97     }
98
99     public SuggestionSearchEntity getSuggestionSearchEntity() {
100       return ssec;
101     }
102   }
103
104   private static final Logger LOG =
105       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
106   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
111   private boolean syncInProgress;
112   private Map<String, String> contextMap;
113   protected ExecutorService esPutExecutor;
114   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
115   private Map<String, Integer> retryLimitTracker;
116   private OxmEntityLookup oxmEntityLookup;
117   private SuggestionEntityLookup suggestionEntityLookup;
118   private FiltersConfig filtersConfig;
119
120   /**
121    * Instantiates a new historical entity summarizer.
122    *
123    * @param indexName the index name
124    * @throws Exception the exception
125    */
126   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
127       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
128       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
129       SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig) throws Exception {
130
131     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
132         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
133     
134     this.oxmEntityLookup = oxmEntityLookup;
135     this.suggestionEntityLookup = suggestionEntityLookup;
136     this.allWorkEnumerated = false;
137     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
138     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
139     this.synchronizerName = "Autosuggestion Entity Synchronizer";
140     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
141     this.syncInProgress = false;
142     this.contextMap = MDC.getCopyOfContextMap();
143     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
144     this.retryQueue = new ConcurrentLinkedDeque<RetrySuggestionEntitySyncContainer>();
145     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
146     this.syncDurationInMs = -1;
147     this.filtersConfig = filtersConfig;
148   }
149
150   /**
151    * Collect all the work.
152    *
153    * @return the operation state
154    */
155   private OperationState collectAllTheWork() {
156     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
157     Map<String, SuggestionEntityDescriptor> descriptorMap =
158         suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
159
160     if (descriptorMap.isEmpty()) {
161       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
162       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
163       return OperationState.ERROR;
164     }
165
166     Collection<String> syncTypes = descriptorMap.keySet();
167
168     try {
169
170       /*
171        * launch a parallel async thread to process the documents for each entity-type (to max the of
172        * the configured executor anyway)
173        */
174
175       aaiWorkOnHand.set(syncTypes.size());
176
177       for (String key : syncTypes) {
178
179         supplyAsync(new Supplier<Void>() {
180
181           @Override
182           public Void get() {
183             MDC.setContextMap(contextMap);
184             OperationResult typeLinksResult = null;
185             try {
186               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
187               aaiWorkOnHand.decrementAndGet();
188               processEntityTypeSelfLinks(typeLinksResult);
189             } catch (Exception exc) {
190               LOG.error(AaiUiMsgs.ERROR_GENERIC,
191                   "An error occurred while processing entity self-links. Error: "
192                       + exc.getMessage());
193             }
194
195             return null;
196           }
197
198         }, aaiExecutor).whenComplete((result, error) -> {
199
200           if (error != null) {
201             LOG.error(AaiUiMsgs.ERROR_GENERIC,
202                 "An error occurred getting data from AAI. Error = " + error.getMessage());
203           }
204         });
205
206       }
207
208       while (aaiWorkOnHand.get() != 0) {
209
210         if (LOG.isDebugEnabled()) {
211           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
212         }
213
214         Thread.sleep(1000);
215       }
216
217       aaiWorkOnHand.set(selflinks.size());
218       allWorkEnumerated = true;
219       syncEntityTypes();
220
221       while (!isSyncDone()) {
222         performRetrySync();
223         Thread.sleep(1000);
224       }
225
226       /*
227        * Make sure we don't hang on to retries that failed which could cause issues during future
228        * syncs
229        */
230       retryLimitTracker.clear();
231
232     } catch (Exception exc) {
233       LOG.error(AaiUiMsgs.ERROR_GENERIC,
234           "An error occurred while performing the sync.  Error: " + exc.getMessage());
235     }
236
237     return OperationState.OK;
238
239   }
240
241   /*
242    * (non-Javadoc)
243    * 
244    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
245    */
246   @Override
247   public OperationState doSync() {
248     this.syncDurationInMs = -1;
249     syncStartedTimeStampInMs = System.currentTimeMillis();
250     String txnID = NodeUtils.getRandomTxnId();
251     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
252
253     return collectAllTheWork();
254   }
255
256   /**
257    * Process entity type self links.
258    *
259    * @param operationResult the operation result
260    */
261   private void processEntityTypeSelfLinks(OperationResult operationResult) {
262
263     JsonNode rootNode = null;
264     
265     if ( operationResult == null ) {
266         return;
267     }
268
269     final String jsonResult = operationResult.getResult();
270
271     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
272
273       try {
274         rootNode = mapper.readTree(jsonResult);
275       } catch (IOException exc) {
276         String message = "Could not deserialize JSON (representing operation result) as node tree. "
277             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
278         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
279       }
280
281       JsonNode resultData = rootNode.get("result-data");
282       ArrayNode resultDataArrayNode = null;
283
284       if (resultData.isArray()) {
285         resultDataArrayNode = (ArrayNode) resultData;
286
287         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
288         JsonNode element = null;
289
290         while (elementIterator.hasNext()) {
291           element = elementIterator.next();
292
293           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
294           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
295
296           OxmEntityDescriptor descriptor = null;
297
298           if (resourceType != null && resourceLink != null) {
299
300             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
301
302             if (descriptor == null) {
303               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
304               // go to next element in iterator
305               continue;
306             }
307             selflinks.add(new SelfLinkDescriptor(resourceLink,
308                 SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
309
310
311           }
312         }
313       }
314     }
315   }
316
317   /**
318    * Sync entity types.
319    */
320   private void syncEntityTypes() {
321
322     while (selflinks.peek() != null) {
323
324       SelfLinkDescriptor linkDescriptor = selflinks.poll();
325       aaiWorkOnHand.decrementAndGet();
326
327       OxmEntityDescriptor descriptor = null;
328
329       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
330
331         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
332
333         if (descriptor == null) {
334           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
335           // go to next element in iterator
336           continue;
337         }
338
339         NetworkTransaction txn = new NetworkTransaction();
340         txn.setDescriptor(descriptor);
341         txn.setLink(linkDescriptor.getSelfLink());
342         txn.setOperationType(HttpMethod.GET);
343         txn.setEntityType(linkDescriptor.getEntityType());
344
345         aaiWorkOnHand.incrementAndGet();
346
347         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
348             .whenComplete((result, error) -> {
349
350               aaiWorkOnHand.decrementAndGet();
351
352               if (error != null) {
353                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
354               } else {
355                 if (result == null) {
356                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
357                       linkDescriptor.getSelfLink());
358                 } else {
359                   updateActiveInventoryCounters(result);
360                   fetchDocumentForUpsert(result);
361                 }
362               }
363             });
364       }
365
366     }
367
368   }
369
370   /*
371    * Return a set of valid suggestion attributes for the provided entityName that are present in the
372    * JSON
373    * 
374    * @param node JSON node in which the attributes should be found
375    * 
376    * @param entityName Name of the entity
377    * 
378    * @return List of all valid suggestion attributes(key's)
379    */
380   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
381     List<String> suggestableAttr = new ArrayList<String>();
382
383     HashMap<String, String> desc =
384         suggestionEntityLookup.getSuggestionSearchEntityOxmModel().get(entityName);
385
386     if (desc != null) {
387
388       String attr = desc.get("suggestibleAttributes");
389
390       if (attr != null) {
391         suggestableAttr = Arrays.asList(attr.split(","));
392         List<String> suggestableValue = new ArrayList<String>();
393         for (String attribute : suggestableAttr) {
394           if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
395             suggestableValue.add(attribute);
396           }
397         }
398         return suggestableValue;
399       }
400     }
401
402     return new ArrayList<String>();
403   }
404
405   /**
406    * Fetch all the documents for upsert. Based on the number of permutations that are available the
407    * number of documents will be different
408    *
409    * @param txn the txn
410    */
411   private void fetchDocumentForUpsert(NetworkTransaction txn) {
412     if (!txn.getOperationResult().wasSuccessful()) {
413       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
414       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
415       return;
416     }
417     try {
418       final String jsonResult = txn.getOperationResult().getResult();
419
420       if (jsonResult != null && jsonResult.length() > 0) {
421
422         // Step 1: Calculate the number of possible permutations of attributes
423         String entityName = txn.getDescriptor().getEntityName();
424         JsonNode entityNode = mapper.readTree(jsonResult);
425
426         List<String> availableSuggestableAttrName =
427             getSuggestableAttrNamesFromReponse(entityNode, entityName);
428         
429         ArrayList<ArrayList<String>> uniqueLists =
430             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
431         // Now we have a list of all possible permutations for the status that are
432         // defined for this entity type. Try inserting a document for every combination.
433         for (ArrayList<String> uniqueList : uniqueLists) {
434
435           SuggestionSearchEntity sse = new SuggestionSearchEntity(filtersConfig, suggestionEntityLookup);
436           sse.setSuggestableAttr(uniqueList);
437           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
438           sse.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
439           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
440           // The unique id for the document will be created at derive fields
441           sse.deriveFields();
442           // Insert the document only if it has valid statuses
443           if (sse.isSuggestableDoc()) {
444             String link = null;
445             try {
446               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
447             } catch (Exception exc) {
448               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
449             }
450
451             if (link != null) {
452               NetworkTransaction n2 = new NetworkTransaction();
453               n2.setLink(link);
454               n2.setEntityType(txn.getEntityType());
455               n2.setDescriptor(txn.getDescriptor());
456               n2.setOperationType(HttpMethod.GET);
457
458               esWorkOnHand.incrementAndGet();
459
460               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
461                   .whenComplete((result, error) -> {
462
463                     esWorkOnHand.decrementAndGet();
464
465                     if (error != null) {
466                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
467                     } else {
468                       updateElasticSearchCounters(result);
469                       performDocumentUpsert(result, sse);
470                     }
471                   });
472             }
473           }
474         }
475       }
476     } catch (JsonProcessingException exc) {
477         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a json processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
478     } catch (IOException exc) {
479         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a io processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
480     }
481   }
482
483   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
484       NetworkTransaction txn) throws JsonProcessingException, IOException {
485
486     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
487
488     sse.setEntityType(resultDescriptor.getEntityName());
489
490     JsonNode entityNode = mapper.readTree(result);
491
492     List<String> primaryKeyValues = new ArrayList<String>();
493     String pkeyValue = null;
494
495     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
496       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
497       if (pkeyValue != null) {
498         primaryKeyValues.add(pkeyValue);
499       } else {
500         String message = "populateSuggestionSearchEntityDocument(),"
501             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
502         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
503       }
504     }
505
506     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
507     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
508     sse.generateSuggestionInputPermutations();
509   }
510
511   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
512     /**
513      * <p>
514      * <ul>
515      * As part of the response processing we need to do the following:
516      * <li>1. Extract the version (if present), it will be the ETAG when we use the
517      * Search-Abstraction-Service
518      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
519      * tag
520      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
521      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
522      * </ul>
523      * </p>
524      */
525     String link = null;
526     try {
527       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
528     } catch (Exception exc) {
529       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
530       return;
531     }
532
533     boolean wasEntryDiscovered = false;
534     if (esGetTxn.getOperationResult().getResultCode() == 404) {
535       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
536     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
537       wasEntryDiscovered = true;
538     } else {
539       /*
540        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
541        * found TODO -> Should we return.
542        */
543       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
544           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
545       return;
546     }
547     // Insert a new document only if the paylod is different.
548     // This is determined by hashing the payload and using it as a id for the document
549     //
550     if (!wasEntryDiscovered) {
551       try {
552         String jsonPayload = null;
553
554         jsonPayload = sse.getAsJson();
555         if (link != null && jsonPayload != null) {
556
557           NetworkTransaction updateElasticTxn = new NetworkTransaction();
558           updateElasticTxn.setLink(link);
559           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
560           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
561           updateElasticTxn.setOperationType(HttpMethod.PUT);
562
563           esWorkOnHand.incrementAndGet();
564           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
565               esPutExecutor).whenComplete((result, error) -> {
566
567                 esWorkOnHand.decrementAndGet();
568
569                 if (error != null) {
570                   String message = "Suggestion search entity sync UPDATE PUT error - "
571                       + error.getLocalizedMessage();
572                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
573                 } else {
574                   updateElasticSearchCounters(result);
575                   processStoreDocumentResult(result, esGetTxn, sse);
576                 }
577               });
578         }
579       } catch (Exception exc) {
580         String message =
581             "Exception caught during suggestion search entity sync PUT operation. Message - "
582                 + exc.getLocalizedMessage();
583         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
584       }
585     }
586   }
587
588   private void processStoreDocumentResult(NetworkTransaction esPutResult,
589       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
590
591     OperationResult or = esPutResult.getOperationResult();
592
593     if (!or.wasSuccessful()) {
594       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
595
596         if (shouldAllowRetry(sse.getId())) {
597           esWorkOnHand.incrementAndGet();
598
599           RetrySuggestionEntitySyncContainer rssec =
600               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
601           retryQueue.push(rssec);
602
603           String message = "Store document failed during suggestion search entity synchronization"
604               + " due to version conflict. Entity will be re-synced.";
605           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
606         }
607       } else {
608         String message =
609             "Store document failed during suggestion search entity synchronization with result code "
610                 + or.getResultCode() + " and result message " + or.getResult();
611         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
612       }
613     }
614   }
615
616   /**
617    * Perform retry sync.
618    */
619   private void performRetrySync() {
620     while (retryQueue.peek() != null) {
621
622       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
623       if (susc != null) {
624
625         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
626         NetworkTransaction txn = susc.getNetworkTransaction();
627
628         String link = null;
629         try {
630           /*
631            * In this retry flow the se object has already derived its fields
632            */
633           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
634         } catch (Exception exc) {
635           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
636         }
637
638         if (link != null) {
639           NetworkTransaction retryTransaction = new NetworkTransaction();
640           retryTransaction.setLink(link);
641           retryTransaction.setEntityType(txn.getEntityType());
642           retryTransaction.setDescriptor(txn.getDescriptor());
643           retryTransaction.setOperationType(HttpMethod.GET);
644
645           /*
646            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
647            * called incrementAndGet when queuing the failed PUT!
648            */
649
650           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
651               esExecutor).whenComplete((result, error) -> {
652
653                 esWorkOnHand.decrementAndGet();
654
655                 if (error != null) {
656                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
657                 } else {
658                   updateElasticSearchCounters(result);
659                   performDocumentUpsert(result, sus);
660                 }
661               });
662         }
663
664       }
665     }
666   }
667
668   /**
669    * Should allow retry.
670    *
671    * @param id the id
672    * @return true, if successful
673    */
674   private boolean shouldAllowRetry(String id) {
675     boolean isRetryAllowed = true;
676     if (retryLimitTracker.get(id) != null) {
677       Integer currentCount = retryLimitTracker.get(id);
678       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
679         isRetryAllowed = false;
680         String message = "Searchable entity re-sync limit reached for " + id
681             + ", re-sync will no longer be attempted for this entity";
682         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
683       } else {
684         Integer newCount = new Integer(currentCount.intValue() + 1);
685         retryLimitTracker.put(id, newCount);
686       }
687     } else {
688       Integer firstRetryCount = new Integer(1);
689       retryLimitTracker.put(id, firstRetryCount);
690     }
691
692     return isRetryAllowed;
693   }
694
695
696
697   @Override
698   public SynchronizerState getState() {
699
700     if (!isSyncDone()) {
701       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
702     }
703
704     return SynchronizerState.IDLE;
705
706   }
707
708   /*
709    * (non-Javadoc)
710    * 
711    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
712    */
713   @Override
714   public String getStatReport(boolean showFinalReport) {
715     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
716     return getStatReport(syncDurationInMs, showFinalReport);
717   }
718
719   /*
720    * (non-Javadoc)
721    * 
722    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
723    */
724   @Override
725   public void shutdown() {
726     this.shutdownExecutors();
727   }
728
729   @Override
730   protected boolean isSyncDone() {
731
732     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
733
734     if (LOG.isDebugEnabled()) {
735       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
736           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
737     }
738
739     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
740       return false;
741     }
742
743     this.syncInProgress = false;
744
745     return true;
746   }
747
748   /*
749    * (non-Javadoc)
750    * 
751    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
752    */
753   @Override
754   public void clearCache() {
755
756     if (syncInProgress) {
757       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
758           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
759       return;
760     }
761
762     super.clearCache();
763     this.resetCounters();
764     if (entityCounters != null) {
765       entityCounters.clear();
766     }
767
768     allWorkEnumerated = false;
769
770   }
771
772 }