Update Sparky README files
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.autosuggestion.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Deque;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Supplier;
42
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45 import org.onap.aai.cl.mdc.MdcContext;
46 import org.onap.aai.restclient.client.OperationResult;
47 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
48 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
50 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
51 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
52 import org.onap.aai.sparky.dal.NetworkTransaction;
53 import org.onap.aai.sparky.dal.rest.HttpMethod;
54 import org.onap.aai.sparky.logging.AaiUiMsgs;
55 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
56 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
57 import org.onap.aai.sparky.sync.IndexSynchronizer;
58 import org.onap.aai.sparky.sync.SynchronizerConstants;
59 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
60 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
61 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
62 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
63 import org.onap.aai.sparky.sync.enumeration.OperationState;
64 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
65 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
66 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
67 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
68 import org.onap.aai.sparky.util.NodeUtils;
69 import org.onap.aai.sparky.util.SuggestionsPermutation;
70 import org.slf4j.MDC;
71
72 import com.fasterxml.jackson.core.JsonProcessingException;
73 import com.fasterxml.jackson.databind.JsonNode;
74 import com.fasterxml.jackson.databind.node.ArrayNode;
75
76 /**
77  * The Class AutosuggestionSynchronizer.
78  */
79 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
80     implements IndexSynchronizer {
81
82   private class RetrySuggestionEntitySyncContainer {
83     NetworkTransaction txn;
84     SuggestionSearchEntity ssec;
85
86     /**
87      * Instantiates a new RetrySuggestionEntitySyncContainer.
88      *
89      * @param txn the txn
90      * @param icer the icer
91      */
92     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
93       this.txn = txn;
94       this.ssec = icer;
95     }
96
97     public NetworkTransaction getNetworkTransaction() {
98       return txn;
99     }
100
101     public SuggestionSearchEntity getSuggestionSearchEntity() {
102       return ssec;
103     }
104   }
105
106   private static final Logger LOG =
107       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
108   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
109
110   private boolean allWorkEnumerated;
111   private Deque<SelfLinkDescriptor> selflinks;
112   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
113   private boolean syncInProgress;
114   private Map<String, String> contextMap;
115   protected ExecutorService esPutExecutor;
116   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
117   private Map<String, Integer> retryLimitTracker;
118   private OxmEntityLookup oxmEntityLookup;
119   private SuggestionEntityLookup suggestionEntityLookup;
120   private FiltersConfig filtersConfig;
121
122   /**
123    * Instantiates a new historical entity summarizer.
124    *
125    * @param indexName the index name
126    * @throws Exception the exception
127    */
128   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
129       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
130       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
131       SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig) throws Exception {
132
133     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
134         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
135     
136     this.oxmEntityLookup = oxmEntityLookup;
137     this.suggestionEntityLookup = suggestionEntityLookup;
138     this.allWorkEnumerated = false;
139     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
140     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
141     this.synchronizerName = "Autosuggestion Entity Synchronizer";
142     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
143     this.syncInProgress = false;
144     this.contextMap = MDC.getCopyOfContextMap();
145     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
146     this.syncDurationInMs = -1;
147     this.filtersConfig = filtersConfig;
148   }
149
150   /**
151    * Collect all the work.
152    *
153    * @return the operation state
154    */
155   private OperationState collectAllTheWork() {
156     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
157     Map<String, SuggestionEntityDescriptor> descriptorMap =
158         suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
159
160     if (descriptorMap.isEmpty()) {
161       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
162       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
163       return OperationState.ERROR;
164     }
165
166     Collection<String> syncTypes = descriptorMap.keySet();
167
168     try {
169
170       /*
171        * launch a parallel async thread to process the documents for each entity-type (to max the of
172        * the configured executor anyway)
173        */
174
175       aaiWorkOnHand.set(syncTypes.size());
176
177       for (String key : syncTypes) {
178
179         supplyAsync(new Supplier<Void>() {
180
181           @Override
182           public Void get() {
183             MDC.setContextMap(contextMap);
184             OperationResult typeLinksResult = null;
185             try {
186               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
187               aaiWorkOnHand.decrementAndGet();
188               processEntityTypeSelfLinks(typeLinksResult);
189             } catch (Exception exc) {
190               // TODO -> LOG, what should be logged here?
191             }
192
193             return null;
194           }
195
196         }, aaiExecutor).whenComplete((result, error) -> {
197
198           if (error != null) {
199             LOG.error(AaiUiMsgs.ERROR_GENERIC,
200                 "An error occurred getting data from AAI. Error = " + error.getMessage());
201           }
202         });
203
204       }
205
206       while (aaiWorkOnHand.get() != 0) {
207
208         if (LOG.isDebugEnabled()) {
209           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
210         }
211
212         Thread.sleep(1000);
213       }
214
215       aaiWorkOnHand.set(selflinks.size());
216       allWorkEnumerated = true;
217       syncEntityTypes();
218
219       while (!isSyncDone()) {
220         performRetrySync();
221         Thread.sleep(1000);
222       }
223
224       /*
225        * Make sure we don't hang on to retries that failed which could cause issues during future
226        * syncs
227        */
228       retryLimitTracker.clear();
229
230     } catch (Exception exc) {
231       // TODO -> LOG, waht should be logged here?
232     }
233
234     return OperationState.OK;
235
236   }
237
238   /*
239    * (non-Javadoc)
240    * 
241    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
242    */
243   @Override
244   public OperationState doSync() {
245     this.syncDurationInMs = -1;
246     syncStartedTimeStampInMs = System.currentTimeMillis();
247     String txnID = NodeUtils.getRandomTxnId();
248     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
249
250     return collectAllTheWork();
251   }
252
253   /**
254    * Process entity type self links.
255    *
256    * @param operationResult the operation result
257    */
258   private void processEntityTypeSelfLinks(OperationResult operationResult) {
259
260     JsonNode rootNode = null;
261
262     final String jsonResult = operationResult.getResult();
263
264     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
265
266       try {
267         rootNode = mapper.readTree(jsonResult);
268       } catch (IOException exc) {
269         String message = "Could not deserialize JSON (representing operation result) as node tree. "
270             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
271         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
272       }
273
274       JsonNode resultData = rootNode.get("result-data");
275       ArrayNode resultDataArrayNode = null;
276
277       if (resultData.isArray()) {
278         resultDataArrayNode = (ArrayNode) resultData;
279
280         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
281         JsonNode element = null;
282
283         while (elementIterator.hasNext()) {
284           element = elementIterator.next();
285
286           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
287           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
288
289           OxmEntityDescriptor descriptor = null;
290
291           if (resourceType != null && resourceLink != null) {
292
293             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
294
295             if (descriptor == null) {
296               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
297               // go to next element in iterator
298               continue;
299             }
300             selflinks.add(new SelfLinkDescriptor(resourceLink,
301                 SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
302
303
304           }
305         }
306       }
307     }
308   }
309
310   /**
311    * Sync entity types.
312    */
313   private void syncEntityTypes() {
314
315     while (selflinks.peek() != null) {
316
317       SelfLinkDescriptor linkDescriptor = selflinks.poll();
318       aaiWorkOnHand.decrementAndGet();
319
320       OxmEntityDescriptor descriptor = null;
321
322       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
323
324         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
325
326         if (descriptor == null) {
327           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
328           // go to next element in iterator
329           continue;
330         }
331
332         NetworkTransaction txn = new NetworkTransaction();
333         txn.setDescriptor(descriptor);
334         txn.setLink(linkDescriptor.getSelfLink());
335         txn.setOperationType(HttpMethod.GET);
336         txn.setEntityType(linkDescriptor.getEntityType());
337
338         aaiWorkOnHand.incrementAndGet();
339
340         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
341             .whenComplete((result, error) -> {
342
343               aaiWorkOnHand.decrementAndGet();
344
345               if (error != null) {
346                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
347               } else {
348                 if (result == null) {
349                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
350                       linkDescriptor.getSelfLink());
351                 } else {
352                   updateActiveInventoryCounters(result);
353                   fetchDocumentForUpsert(result);
354                 }
355               }
356             });
357       }
358
359     }
360
361   }
362
363   /*
364    * Return a set of valid suggestion attributes for the provided entityName
365    * that are present in the JSON
366    * @param node JSON node in which the attributes should be found
367    * @param entityName Name of the entity
368    * @return List of all valid suggestion attributes(key's)
369    */
370   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
371     List<String> suggestableAttr = new ArrayList<String>();
372     HashMap<String, String> desc =
373         suggestionEntityLookup.getSuggestionSearchEntityOxmModel().get(entityName);
374     String attr = desc.get("suggestibleAttributes");
375     suggestableAttr = Arrays.asList(attr.split(","));
376     List<String> suggestableValue = new ArrayList<>();
377     for (String attribute : suggestableAttr) {
378       if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
379         suggestableValue.add(attribute);
380       }
381     }
382     return suggestableValue;
383   }
384
385   /**
386    * Fetch all the documents for upsert. Based on the number of permutations that are available the
387    * number of documents will be different
388    *
389    * @param txn the txn
390    */
391   private void fetchDocumentForUpsert(NetworkTransaction txn) {
392     if (!txn.getOperationResult().wasSuccessful()) {
393       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
394       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
395       return;
396     }
397     try {
398       final String jsonResult = txn.getOperationResult().getResult();
399
400       if (jsonResult != null && jsonResult.length() > 0) {
401
402         // Step 1: Calculate the number of possible permutations of attributes
403         String entityName = txn.getDescriptor().getEntityName();
404         JsonNode entityNode = mapper.readTree(jsonResult);
405
406         List<String> availableSuggestableAttrName =
407             getSuggestableAttrNamesFromReponse(entityNode, entityName);
408         
409         ArrayList<ArrayList<String>> uniqueLists =
410             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
411         // Now we have a list of all possible permutations for the status that are
412         // defined for this entity type. Try inserting a document for every combination.
413         for (ArrayList<String> uniqueList : uniqueLists) {
414
415           SuggestionSearchEntity sse = new SuggestionSearchEntity(filtersConfig, suggestionEntityLookup);
416           sse.setSuggestableAttr(uniqueList);
417           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
418           sse.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
419           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
420           // The unique id for the document will be created at derive fields
421           sse.deriveFields();
422           // Insert the document only if it has valid statuses
423           if (sse.isSuggestableDoc()) {
424             String link = null;
425             try {
426               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
427             } catch (Exception exc) {
428               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
429             }
430
431             if (link != null) {
432               NetworkTransaction n2 = new NetworkTransaction();
433               n2.setLink(link);
434               n2.setEntityType(txn.getEntityType());
435               n2.setDescriptor(txn.getDescriptor());
436               n2.setOperationType(HttpMethod.GET);
437
438               esWorkOnHand.incrementAndGet();
439
440               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
441                   .whenComplete((result, error) -> {
442
443                     esWorkOnHand.decrementAndGet();
444
445                     if (error != null) {
446                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
447                     } else {
448                       updateElasticSearchCounters(result);
449                       performDocumentUpsert(result, sse);
450                     }
451                   });
452             }
453           }
454         }
455       }
456     } catch (JsonProcessingException exc) {
457       // TODO -> LOG, waht should be logged here?
458     } catch (IOException exc) {
459       // TODO -> LOG, waht should be logged here?
460     }
461   }
462
463   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
464       NetworkTransaction txn) throws JsonProcessingException, IOException {
465
466     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
467
468     sse.setEntityType(resultDescriptor.getEntityName());
469
470     JsonNode entityNode = mapper.readTree(result);
471
472     List<String> primaryKeyValues = new ArrayList<String>();
473     String pkeyValue = null;
474
475     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
476       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
477       if (pkeyValue != null) {
478         primaryKeyValues.add(pkeyValue);
479       } else {
480         String message = "populateSuggestionSearchEntityDocument(),"
481             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
482         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
483       }
484     }
485
486     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
487     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
488     sse.generateSuggestionInputPermutations();
489   }
490
491   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
492     /**
493      * <p>
494      * <ul>
495      * As part of the response processing we need to do the following:
496      * <li>1. Extract the version (if present), it will be the ETAG when we use the
497      * Search-Abstraction-Service
498      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
499      * tag
500      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
501      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
502      * </ul>
503      * </p>
504      */
505     String link = null;
506     try {
507       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
508     } catch (Exception exc) {
509       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
510       return;
511     }
512
513     boolean wasEntryDiscovered = false;
514     if (esGetTxn.getOperationResult().getResultCode() == 404) {
515       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
516     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
517       wasEntryDiscovered = true;
518     } else {
519       /*
520        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
521        * found TODO -> Should we return.
522        */
523       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
524           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
525       return;
526     }
527     // Insert a new document only if the paylod is different.
528     // This is determined by hashing the payload and using it as a id for the document
529     //
530     if (!wasEntryDiscovered) {
531       try {
532         String jsonPayload = null;
533
534         jsonPayload = sse.getAsJson();
535         if (link != null && jsonPayload != null) {
536
537           NetworkTransaction updateElasticTxn = new NetworkTransaction();
538           updateElasticTxn.setLink(link);
539           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
540           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
541           updateElasticTxn.setOperationType(HttpMethod.PUT);
542
543           esWorkOnHand.incrementAndGet();
544           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
545               esPutExecutor).whenComplete((result, error) -> {
546
547                 esWorkOnHand.decrementAndGet();
548
549                 if (error != null) {
550                   String message = "Suggestion search entity sync UPDATE PUT error - "
551                       + error.getLocalizedMessage();
552                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
553                 } else {
554                   updateElasticSearchCounters(result);
555                   processStoreDocumentResult(result, esGetTxn, sse);
556                 }
557               });
558         }
559       } catch (Exception exc) {
560         String message =
561             "Exception caught during suggestion search entity sync PUT operation. Message - "
562                 + exc.getLocalizedMessage();
563         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
564       }
565     }
566   }
567
568   private void processStoreDocumentResult(NetworkTransaction esPutResult,
569       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
570
571     OperationResult or = esPutResult.getOperationResult();
572
573     if (!or.wasSuccessful()) {
574       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
575
576         if (shouldAllowRetry(sse.getId())) {
577           esWorkOnHand.incrementAndGet();
578
579           RetrySuggestionEntitySyncContainer rssec =
580               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
581           retryQueue.push(rssec);
582
583           String message = "Store document failed during suggestion search entity synchronization"
584               + " due to version conflict. Entity will be re-synced.";
585           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
586         }
587       } else {
588         String message =
589             "Store document failed during suggestion search entity synchronization with result code "
590                 + or.getResultCode() + " and result message " + or.getResult();
591         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
592       }
593     }
594   }
595
596   /**
597    * Perform retry sync.
598    */
599   private void performRetrySync() {
600     while (retryQueue.peek() != null) {
601
602       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
603       if (susc != null) {
604
605         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
606         NetworkTransaction txn = susc.getNetworkTransaction();
607
608         String link = null;
609         try {
610           /*
611            * In this retry flow the se object has already derived its fields
612            */
613           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
614         } catch (Exception exc) {
615           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
616         }
617
618         if (link != null) {
619           NetworkTransaction retryTransaction = new NetworkTransaction();
620           retryTransaction.setLink(link);
621           retryTransaction.setEntityType(txn.getEntityType());
622           retryTransaction.setDescriptor(txn.getDescriptor());
623           retryTransaction.setOperationType(HttpMethod.GET);
624
625           /*
626            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
627            * called incrementAndGet when queuing the failed PUT!
628            */
629
630           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
631               esExecutor).whenComplete((result, error) -> {
632
633                 esWorkOnHand.decrementAndGet();
634
635                 if (error != null) {
636                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
637                 } else {
638                   updateElasticSearchCounters(result);
639                   performDocumentUpsert(result, sus);
640                 }
641               });
642         }
643
644       }
645     }
646   }
647
648   /**
649    * Should allow retry.
650    *
651    * @param id the id
652    * @return true, if successful
653    */
654   private boolean shouldAllowRetry(String id) {
655     boolean isRetryAllowed = true;
656     if (retryLimitTracker.get(id) != null) {
657       Integer currentCount = retryLimitTracker.get(id);
658       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
659         isRetryAllowed = false;
660         String message = "Searchable entity re-sync limit reached for " + id
661             + ", re-sync will no longer be attempted for this entity";
662         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
663       } else {
664         Integer newCount = new Integer(currentCount.intValue() + 1);
665         retryLimitTracker.put(id, newCount);
666       }
667     } else {
668       Integer firstRetryCount = new Integer(1);
669       retryLimitTracker.put(id, firstRetryCount);
670     }
671
672     return isRetryAllowed;
673   }
674
675
676
677   @Override
678   public SynchronizerState getState() {
679
680     if (!isSyncDone()) {
681       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
682     }
683
684     return SynchronizerState.IDLE;
685
686   }
687
688   /*
689    * (non-Javadoc)
690    * 
691    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
692    */
693   @Override
694   public String getStatReport(boolean showFinalReport) {
695     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
696     return getStatReport(syncDurationInMs, showFinalReport);
697   }
698
699   /*
700    * (non-Javadoc)
701    * 
702    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
703    */
704   @Override
705   public void shutdown() {
706     this.shutdownExecutors();
707   }
708
709   @Override
710   protected boolean isSyncDone() {
711
712     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
713
714     if (LOG.isDebugEnabled()) {
715       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
716           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
717     }
718
719     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
720       return false;
721     }
722
723     this.syncInProgress = false;
724
725     return true;
726   }
727
728   /*
729    * (non-Javadoc)
730    * 
731    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
732    */
733   @Override
734   public void clearCache() {
735
736     if (syncInProgress) {
737       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
738           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
739       return;
740     }
741
742     super.clearCache();
743     this.resetCounters();
744     if (entityCounters != null) {
745       entityCounters.clear();
746     }
747
748     allWorkEnumerated = false;
749
750   }
751
752 }