Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.autosuggestion.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Deque;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Supplier;
42
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45 import org.onap.aai.cl.mdc.MdcContext;
46 import org.onap.aai.restclient.client.OperationResult;
47 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
48 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
50 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
51 import org.onap.aai.sparky.dal.NetworkTransaction;
52 import org.onap.aai.sparky.dal.aai.config.ActiveInventoryConfig;
53 import org.onap.aai.sparky.dal.rest.HttpMethod;
54 import org.onap.aai.sparky.logging.AaiUiMsgs;
55 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
56 import org.onap.aai.sparky.sync.IndexSynchronizer;
57 import org.onap.aai.sparky.sync.SynchronizerConstants;
58 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
59 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
60 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
61 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
62 import org.onap.aai.sparky.sync.enumeration.OperationState;
63 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
64 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
65 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
66 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
67 import org.onap.aai.sparky.util.NodeUtils;
68 import org.onap.aai.sparky.util.SuggestionsPermutation;
69 import org.slf4j.MDC;
70
71 import com.fasterxml.jackson.core.JsonProcessingException;
72 import com.fasterxml.jackson.databind.JsonNode;
73 import com.fasterxml.jackson.databind.node.ArrayNode;
74
75 /**
76  * The Class AutosuggestionSynchronizer.
77  */
78 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
79     implements IndexSynchronizer {
80
81   private class RetrySuggestionEntitySyncContainer {
82     NetworkTransaction txn;
83     SuggestionSearchEntity ssec;
84
85     /**
86      * Instantiates a new RetrySuggestionEntitySyncContainer.
87      *
88      * @param txn the txn
89      * @param icer the icer
90      */
91     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
92       this.txn = txn;
93       this.ssec = icer;
94     }
95
96     public NetworkTransaction getNetworkTransaction() {
97       return txn;
98     }
99
100     public SuggestionSearchEntity getSuggestionSearchEntity() {
101       return ssec;
102     }
103   }
104
105   private static final Logger LOG =
106       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
107   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
108
109   private boolean allWorkEnumerated;
110   private Deque<SelfLinkDescriptor> selflinks;
111   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
112   private boolean syncInProgress;
113   private Map<String, String> contextMap;
114   protected ExecutorService esPutExecutor;
115   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
116   private Map<String, Integer> retryLimitTracker;
117
118   /**
119    * Instantiates a new historical entity summarizer.
120    *
121    * @param indexName the index name
122    * @throws Exception the exception
123    */
124   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
125       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
126       NetworkStatisticsConfig esStatConfig) throws Exception {
127     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
128         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
129
130     this.allWorkEnumerated = false;
131     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
132     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
133     this.synchronizerName = "Autosuggestion Entity Synchronizer";
134     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
135     this.syncInProgress = false;
136     this.contextMap = MDC.getCopyOfContextMap();
137     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
138     this.syncDurationInMs = -1;
139   }
140
141   /**
142    * Collect all the work.
143    *
144    * @return the operation state
145    */
146   private OperationState collectAllTheWork() {
147     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
148     Map<String, SuggestionEntityDescriptor> descriptorMap =
149         SuggestionEntityLookup.getInstance().getSuggestionSearchEntityDescriptors();
150
151     if (descriptorMap.isEmpty()) {
152       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
153       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
154       return OperationState.ERROR;
155     }
156
157     Collection<String> syncTypes = descriptorMap.keySet();
158
159     try {
160
161       /*
162        * launch a parallel async thread to process the documents for each entity-type (to max the of
163        * the configured executor anyway)
164        */
165
166       aaiWorkOnHand.set(syncTypes.size());
167
168       for (String key : syncTypes) {
169
170         supplyAsync(new Supplier<Void>() {
171
172           @Override
173           public Void get() {
174             MDC.setContextMap(contextMap);
175             OperationResult typeLinksResult = null;
176             try {
177               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
178               aaiWorkOnHand.decrementAndGet();
179               processEntityTypeSelfLinks(typeLinksResult);
180             } catch (Exception exc) {
181               // TODO -> LOG, what should be logged here?
182             }
183
184             return null;
185           }
186
187         }, aaiExecutor).whenComplete((result, error) -> {
188
189           if (error != null) {
190             LOG.error(AaiUiMsgs.ERROR_GENERIC,
191                 "An error occurred getting data from AAI. Error = " + error.getMessage());
192           }
193         });
194
195       }
196
197       while (aaiWorkOnHand.get() != 0) {
198
199         if (LOG.isDebugEnabled()) {
200           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
201         }
202
203         Thread.sleep(1000);
204       }
205
206       aaiWorkOnHand.set(selflinks.size());
207       allWorkEnumerated = true;
208       syncEntityTypes();
209
210       while (!isSyncDone()) {
211         performRetrySync();
212         Thread.sleep(1000);
213       }
214
215       /*
216        * Make sure we don't hang on to retries that failed which could cause issues during future
217        * syncs
218        */
219       retryLimitTracker.clear();
220
221     } catch (Exception exc) {
222       // TODO -> LOG, waht should be logged here?
223     }
224
225     return OperationState.OK;
226
227   }
228
229   /*
230    * (non-Javadoc)
231    * 
232    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
233    */
234   @Override
235   public OperationState doSync() {
236     this.syncDurationInMs = -1;
237     syncStartedTimeStampInMs = System.currentTimeMillis();
238     String txnID = NodeUtils.getRandomTxnId();
239     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
240
241     return collectAllTheWork();
242   }
243
244   /**
245    * Process entity type self links.
246    *
247    * @param operationResult the operation result
248    */
249   private void processEntityTypeSelfLinks(OperationResult operationResult) {
250
251     JsonNode rootNode = null;
252
253     final String jsonResult = operationResult.getResult();
254
255     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
256
257       try {
258         rootNode = mapper.readTree(jsonResult);
259       } catch (IOException exc) {
260         String message = "Could not deserialize JSON (representing operation result) as node tree. "
261             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
262         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
263       }
264
265       JsonNode resultData = rootNode.get("result-data");
266       ArrayNode resultDataArrayNode = null;
267
268       if (resultData.isArray()) {
269         resultDataArrayNode = (ArrayNode) resultData;
270
271         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
272         JsonNode element = null;
273
274         while (elementIterator.hasNext()) {
275           element = elementIterator.next();
276
277           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
278           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
279
280           OxmEntityDescriptor descriptor = null;
281
282           if (resourceType != null && resourceLink != null) {
283
284             descriptor = OxmEntityLookup.getInstance().getEntityDescriptors().get(resourceType);
285
286             if (descriptor == null) {
287               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
288               // go to next element in iterator
289               continue;
290             }
291             selflinks.add(new SelfLinkDescriptor(resourceLink,
292                 SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
293
294
295           }
296         }
297       }
298     }
299   }
300
301   /**
302    * Sync entity types.
303    */
304   private void syncEntityTypes() {
305
306     while (selflinks.peek() != null) {
307
308       SelfLinkDescriptor linkDescriptor = selflinks.poll();
309       aaiWorkOnHand.decrementAndGet();
310
311       OxmEntityDescriptor descriptor = null;
312
313       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
314
315         descriptor = OxmEntityLookup.getInstance().getEntityDescriptors()
316             .get(linkDescriptor.getEntityType());
317
318         if (descriptor == null) {
319           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
320           // go to next element in iterator
321           continue;
322         }
323
324         NetworkTransaction txn = new NetworkTransaction();
325         txn.setDescriptor(descriptor);
326         txn.setLink(linkDescriptor.getSelfLink());
327         txn.setOperationType(HttpMethod.GET);
328         txn.setEntityType(linkDescriptor.getEntityType());
329
330         aaiWorkOnHand.incrementAndGet();
331
332         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
333             .whenComplete((result, error) -> {
334
335               aaiWorkOnHand.decrementAndGet();
336
337               if (error != null) {
338                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
339               } else {
340                 if (result == null) {
341                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
342                       linkDescriptor.getSelfLink());
343                 } else {
344                   updateActiveInventoryCounters(result);
345                   fetchDocumentForUpsert(result);
346                 }
347               }
348             });
349       }
350
351     }
352
353   }
354
355   /*
356    * Return a set of valid suggestion attributes for the provided entityName that are present in the
357    * JSON
358    * 
359    * @param node JSON node in which the attributes should be found
360    * 
361    * @param entityName Name of the entity
362    * 
363    * @return List of all valid suggestion attributes(key's)
364    */
365   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
366     List<String> suggestableAttr = new ArrayList<String>();
367     HashMap<String, String> desc =
368         SuggestionEntityLookup.getInstance().getSuggestionSearchEntityOxmModel().get(entityName);
369     String attr = desc.get("suggestibleAttributes");
370     suggestableAttr = Arrays.asList(attr.split(","));
371     List<String> suggestableValue = new ArrayList<>();
372     for (String attribute : suggestableAttr) {
373       if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
374         suggestableValue.add(attribute);
375       }
376     }
377     return suggestableValue;
378   }
379
380   /**
381    * Fetch all the documents for upsert. Based on the number of permutations that are available the
382    * number of documents will be different
383    *
384    * @param txn the txn
385    */
386   private void fetchDocumentForUpsert(NetworkTransaction txn) {
387     if (!txn.getOperationResult().wasSuccessful()) {
388       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
389       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
390       return;
391     }
392     try {
393       final String jsonResult = txn.getOperationResult().getResult();
394
395       if (jsonResult != null && jsonResult.length() > 0) {
396
397         // Step 1: Calculate the number of possible permutations of attributes
398         String entityName = txn.getDescriptor().getEntityName();
399         JsonNode entityNode = mapper.readTree(jsonResult);
400
401         List<String> availableSuggestableAttrName =
402             getSuggestableAttrNamesFromReponse(entityNode, entityName);
403
404         ArrayList<ArrayList<String>> uniqueLists =
405             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
406         // Now we have a list of all possible permutations for the status that are
407         // defined for this entity type. Try inserting a document for every combination.
408         for (ArrayList<String> uniqueList : uniqueLists) {
409
410           SuggestionSearchEntity sse =
411               new SuggestionSearchEntity(SuggestionEntityLookup.getInstance());
412           sse.setSuggestableAttr(uniqueList);
413           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
414           sse.setLink(ActiveInventoryConfig.extractResourcePath(txn.getLink()));
415           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
416           // The unique id for the document will be created at derive fields
417           sse.deriveFields();
418           // Insert the document only if it has valid statuses
419           if (sse.isSuggestableDoc()) {
420             String link = null;
421             try {
422               link = getElasticFullUrl("/" + sse.getId(), getIndexName());
423             } catch (Exception exc) {
424               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
425             }
426
427             if (link != null) {
428               NetworkTransaction n2 = new NetworkTransaction();
429               n2.setLink(link);
430               n2.setEntityType(txn.getEntityType());
431               n2.setDescriptor(txn.getDescriptor());
432               n2.setOperationType(HttpMethod.GET);
433
434               esWorkOnHand.incrementAndGet();
435
436               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
437                   .whenComplete((result, error) -> {
438
439                     esWorkOnHand.decrementAndGet();
440
441                     if (error != null) {
442                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
443                     } else {
444                       updateElasticSearchCounters(result);
445                       performDocumentUpsert(result, sse);
446                     }
447                   });
448             }
449           }
450         }
451       }
452     } catch (JsonProcessingException exc) {
453       // TODO -> LOG, waht should be logged here?
454     } catch (IOException exc) {
455       // TODO -> LOG, waht should be logged here?
456     }
457   }
458
459   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
460       NetworkTransaction txn) throws JsonProcessingException, IOException {
461
462     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
463
464     sse.setEntityType(resultDescriptor.getEntityName());
465
466     JsonNode entityNode = mapper.readTree(result);
467
468     List<String> primaryKeyValues = new ArrayList<String>();
469     String pkeyValue = null;
470
471     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
472       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
473       if (pkeyValue != null) {
474         primaryKeyValues.add(pkeyValue);
475       } else {
476         String message = "populateSuggestionSearchEntityDocument(),"
477             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
478         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
479       }
480     }
481
482     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
483     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
484     sse.generateSuggestionInputPermutations();
485   }
486
487   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
488     /**
489      * <p>
490      * <ul>
491      * As part of the response processing we need to do the following:
492      * <li>1. Extract the version (if present), it will be the ETAG when we use the
493      * Search-Abstraction-Service
494      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
495      * tag
496      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
497      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
498      * </ul>
499      * </p>
500      */
501     String link = null;
502     try {
503       link = getElasticFullUrl("/" + sse.getId(), getIndexName());
504     } catch (Exception exc) {
505       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
506       return;
507     }
508
509     boolean wasEntryDiscovered = false;
510     if (esGetTxn.getOperationResult().getResultCode() == 404) {
511       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
512     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
513       wasEntryDiscovered = true;
514     } else {
515       /*
516        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
517        * found TODO -> Should we return.
518        */
519       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
520           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
521       return;
522     }
523     // Insert a new document only if the paylod is different.
524     // This is determined by hashing the payload and using it as a id for the document
525     //
526     if (!wasEntryDiscovered) {
527       try {
528         String jsonPayload = null;
529
530         jsonPayload = sse.getAsJson();
531         if (link != null && jsonPayload != null) {
532
533           NetworkTransaction updateElasticTxn = new NetworkTransaction();
534           updateElasticTxn.setLink(link);
535           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
536           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
537           updateElasticTxn.setOperationType(HttpMethod.PUT);
538
539           esWorkOnHand.incrementAndGet();
540           supplyAsync(
541               new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
542               esPutExecutor).whenComplete((result, error) -> {
543
544                 esWorkOnHand.decrementAndGet();
545
546                 if (error != null) {
547                   String message = "Suggestion search entity sync UPDATE PUT error - "
548                       + error.getLocalizedMessage();
549                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
550                 } else {
551                   updateElasticSearchCounters(result);
552                   processStoreDocumentResult(result, esGetTxn, sse);
553                 }
554               });
555         }
556       } catch (Exception exc) {
557         String message =
558             "Exception caught during suggestion search entity sync PUT operation. Message - "
559                 + exc.getLocalizedMessage();
560         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
561       }
562     }
563   }
564
565   private void processStoreDocumentResult(NetworkTransaction esPutResult,
566       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
567
568     OperationResult or = esPutResult.getOperationResult();
569
570     if (!or.wasSuccessful()) {
571       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
572
573         if (shouldAllowRetry(sse.getId())) {
574           esWorkOnHand.incrementAndGet();
575
576           RetrySuggestionEntitySyncContainer rssec =
577               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
578           retryQueue.push(rssec);
579
580           String message = "Store document failed during suggestion search entity synchronization"
581               + " due to version conflict. Entity will be re-synced.";
582           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
583         }
584       } else {
585         String message =
586             "Store document failed during suggestion search entity synchronization with result code "
587                 + or.getResultCode() + " and result message " + or.getResult();
588         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
589       }
590     }
591   }
592
593   /**
594    * Perform retry sync.
595    */
596   private void performRetrySync() {
597     while (retryQueue.peek() != null) {
598
599       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
600       if (susc != null) {
601
602         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
603         NetworkTransaction txn = susc.getNetworkTransaction();
604
605         String link = null;
606         try {
607           /*
608            * In this retry flow the se object has already derived its fields
609            */
610           link = getElasticFullUrl("/" + sus.getId(), getIndexName());
611         } catch (Exception exc) {
612           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
613         }
614
615         if (link != null) {
616           NetworkTransaction retryTransaction = new NetworkTransaction();
617           retryTransaction.setLink(link);
618           retryTransaction.setEntityType(txn.getEntityType());
619           retryTransaction.setDescriptor(txn.getDescriptor());
620           retryTransaction.setOperationType(HttpMethod.GET);
621
622           /*
623            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
624            * called incrementAndGet when queuing the failed PUT!
625            */
626
627           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
628               esExecutor).whenComplete((result, error) -> {
629
630                 esWorkOnHand.decrementAndGet();
631
632                 if (error != null) {
633                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
634                 } else {
635                   updateElasticSearchCounters(result);
636                   performDocumentUpsert(result, sus);
637                 }
638               });
639         }
640
641       }
642     }
643   }
644
645   /**
646    * Should allow retry.
647    *
648    * @param id the id
649    * @return true, if successful
650    */
651   private boolean shouldAllowRetry(String id) {
652     boolean isRetryAllowed = true;
653     if (retryLimitTracker.get(id) != null) {
654       Integer currentCount = retryLimitTracker.get(id);
655       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
656         isRetryAllowed = false;
657         String message = "Searchable entity re-sync limit reached for " + id
658             + ", re-sync will no longer be attempted for this entity";
659         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
660       } else {
661         Integer newCount = new Integer(currentCount.intValue() + 1);
662         retryLimitTracker.put(id, newCount);
663       }
664     } else {
665       Integer firstRetryCount = new Integer(1);
666       retryLimitTracker.put(id, firstRetryCount);
667     }
668
669     return isRetryAllowed;
670   }
671
672
673
674   @Override
675   public SynchronizerState getState() {
676
677     if (!isSyncDone()) {
678       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
679     }
680
681     return SynchronizerState.IDLE;
682
683   }
684
685   /*
686    * (non-Javadoc)
687    * 
688    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
689    */
690   @Override
691   public String getStatReport(boolean showFinalReport) {
692     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
693     return getStatReport(syncDurationInMs, showFinalReport);
694   }
695
696   /*
697    * (non-Javadoc)
698    * 
699    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
700    */
701   @Override
702   public void shutdown() {
703     this.shutdownExecutors();
704   }
705
706   @Override
707   protected boolean isSyncDone() {
708
709     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
710
711     if (LOG.isDebugEnabled()) {
712       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
713           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
714     }
715
716     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
717       return false;
718     }
719
720     this.syncInProgress = false;
721
722     return true;
723   }
724
725   /*
726    * (non-Javadoc)
727    * 
728    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
729    */
730   @Override
731   public void clearCache() {
732
733     if (syncInProgress) {
734       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
735           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
736       return;
737     }
738
739     super.clearCache();
740     this.resetCounters();
741     if (entityCounters != null) {
742       entityCounters.clear();
743     }
744
745     allWorkEnumerated = false;
746
747   }
748
749 }