5172fe82040b4f3503ff4af64ac59cb6baa5d21c
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / AutosuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.autosuggestion.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Deque;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentLinkedDeque;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Supplier;
42
43 import org.onap.aai.cl.api.Logger;
44 import org.onap.aai.cl.eelf.LoggerFactory;
45 import org.onap.aai.cl.mdc.MdcContext;
46 import org.onap.aai.restclient.client.OperationResult;
47 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
48 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SuggestionEntityDescriptor;
50 import org.onap.aai.sparky.config.oxm.SuggestionEntityLookup;
51 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
52 import org.onap.aai.sparky.dal.NetworkTransaction;
53 import org.onap.aai.sparky.dal.rest.HttpMethod;
54 import org.onap.aai.sparky.logging.AaiUiMsgs;
55 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
56 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
57 import org.onap.aai.sparky.sync.IndexSynchronizer;
58 import org.onap.aai.sparky.sync.SynchronizerConstants;
59 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
60 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
61 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
62 import org.onap.aai.sparky.sync.entity.SuggestionSearchEntity;
63 import org.onap.aai.sparky.sync.enumeration.OperationState;
64 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
65 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
66 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
67 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
68 import org.onap.aai.sparky.util.NodeUtils;
69 import org.onap.aai.sparky.util.SuggestionsPermutation;
70 import org.slf4j.MDC;
71
72 import com.fasterxml.jackson.core.JsonProcessingException;
73 import com.fasterxml.jackson.databind.JsonNode;
74 import com.fasterxml.jackson.databind.node.ArrayNode;
75
76 /**
77  * The Class AutosuggestionSynchronizer.
78  */
79 public class AutosuggestionSynchronizer extends AbstractEntitySynchronizer
80     implements IndexSynchronizer {
81
82   private class RetrySuggestionEntitySyncContainer {
83     NetworkTransaction txn;
84     SuggestionSearchEntity ssec;
85
86     /**
87      * Instantiates a new RetrySuggestionEntitySyncContainer.
88      *
89      * @param txn the txn
90      * @param icer the icer
91      */
92     public RetrySuggestionEntitySyncContainer(NetworkTransaction txn, SuggestionSearchEntity icer) {
93       this.txn = txn;
94       this.ssec = icer;
95     }
96
97     public NetworkTransaction getNetworkTransaction() {
98       return txn;
99     }
100
101     public SuggestionSearchEntity getSuggestionSearchEntity() {
102       return ssec;
103     }
104   }
105
106   private static final Logger LOG =
107       LoggerFactory.getInstance().getLogger(AutosuggestionSynchronizer.class);
108   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
109
110   private boolean allWorkEnumerated;
111   private Deque<SelfLinkDescriptor> selflinks;
112   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
113   private boolean syncInProgress;
114   private Map<String, String> contextMap;
115   protected ExecutorService esPutExecutor;
116   private Deque<RetrySuggestionEntitySyncContainer> retryQueue;
117   private Map<String, Integer> retryLimitTracker;
118   private OxmEntityLookup oxmEntityLookup;
119   private SuggestionEntityLookup suggestionEntityLookup;
120   private FiltersConfig filtersConfig;
121
122   /**
123    * Instantiates a new historical entity summarizer.
124    *
125    * @param indexName the index name
126    * @throws Exception the exception
127    */
128   public AutosuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
129       int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
130       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
131       SuggestionEntityLookup suggestionEntityLookup, FiltersConfig filtersConfig) throws Exception {
132
133     super(LOG, "ASES-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
134         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
135     
136     this.oxmEntityLookup = oxmEntityLookup;
137     this.suggestionEntityLookup = suggestionEntityLookup;
138     this.allWorkEnumerated = false;
139     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
140     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
141     this.synchronizerName = "Autosuggestion Entity Synchronizer";
142     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
143     this.syncInProgress = false;
144     this.contextMap = MDC.getCopyOfContextMap();
145     this.esPutExecutor = NodeUtils.createNamedExecutor("SUES-ES-PUT", 5, LOG);
146     this.retryQueue = new ConcurrentLinkedDeque<RetrySuggestionEntitySyncContainer>();
147     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
148     this.syncDurationInMs = -1;
149     this.filtersConfig = filtersConfig;
150   }
151
152   /**
153    * Collect all the work.
154    *
155    * @return the operation state
156    */
157   private OperationState collectAllTheWork() {
158     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
159     Map<String, SuggestionEntityDescriptor> descriptorMap =
160         suggestionEntityLookup.getSuggestionSearchEntityDescriptors();
161
162     if (descriptorMap.isEmpty()) {
163       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
164       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SUGGESTIBLE_ENTITIES);
165       return OperationState.ERROR;
166     }
167
168     Collection<String> syncTypes = descriptorMap.keySet();
169
170     try {
171
172       /*
173        * launch a parallel async thread to process the documents for each entity-type (to max the of
174        * the configured executor anyway)
175        */
176
177       aaiWorkOnHand.set(syncTypes.size());
178
179       for (String key : syncTypes) {
180
181         supplyAsync(new Supplier<Void>() {
182
183           @Override
184           public Void get() {
185             MDC.setContextMap(contextMap);
186             OperationResult typeLinksResult = null;
187             try {
188               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
189               aaiWorkOnHand.decrementAndGet();
190               processEntityTypeSelfLinks(typeLinksResult);
191             } catch (Exception exc) {
192               LOG.error(AaiUiMsgs.ERROR_GENERIC,
193                   "An error occurred while processing entity self-links. Error: "
194                       + exc.getMessage());
195             }
196
197             return null;
198           }
199
200         }, aaiExecutor).whenComplete((result, error) -> {
201
202           if (error != null) {
203             LOG.error(AaiUiMsgs.ERROR_GENERIC,
204                 "An error occurred getting data from AAI. Error = " + error.getMessage());
205           }
206         });
207
208       }
209
210       while (aaiWorkOnHand.get() != 0) {
211
212         if (LOG.isDebugEnabled()) {
213           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
214         }
215
216         Thread.sleep(1000);
217       }
218
219       aaiWorkOnHand.set(selflinks.size());
220       allWorkEnumerated = true;
221       syncEntityTypes();
222
223       while (!isSyncDone()) {
224         performRetrySync();
225         Thread.sleep(1000);
226       }
227
228       /*
229        * Make sure we don't hang on to retries that failed which could cause issues during future
230        * syncs
231        */
232       retryLimitTracker.clear();
233
234     } catch (Exception exc) {
235       LOG.error(AaiUiMsgs.ERROR_GENERIC,
236           "An error occurred while performing the sync.  Error: " + exc.getMessage());
237     }
238
239     return OperationState.OK;
240
241   }
242
243   /*
244    * (non-Javadoc)
245    * 
246    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
247    */
248   @Override
249   public OperationState doSync() {
250     this.syncDurationInMs = -1;
251     syncStartedTimeStampInMs = System.currentTimeMillis();
252     String txnID = NodeUtils.getRandomTxnId();
253     MdcContext.initialize(txnID, "AutosuggestionSynchronizer", "", "Sync", "");
254
255     return collectAllTheWork();
256   }
257
258   /**
259    * Process entity type self links.
260    *
261    * @param operationResult the operation result
262    */
263   private void processEntityTypeSelfLinks(OperationResult operationResult) {
264
265     JsonNode rootNode = null;
266     
267     if ( operationResult == null ) {
268         return;
269     }
270
271     final String jsonResult = operationResult.getResult();
272
273     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
274
275       try {
276         rootNode = mapper.readTree(jsonResult);
277       } catch (IOException exc) {
278         String message = "Could not deserialize JSON (representing operation result) as node tree. "
279             + "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
280         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
281       }
282
283       JsonNode resultData = rootNode.get("result-data");
284       ArrayNode resultDataArrayNode = null;
285
286       if (resultData.isArray()) {
287         resultDataArrayNode = (ArrayNode) resultData;
288
289         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
290         JsonNode element = null;
291
292         while (elementIterator.hasNext()) {
293           element = elementIterator.next();
294
295           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
296           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
297
298           OxmEntityDescriptor descriptor = null;
299
300           if (resourceType != null && resourceLink != null) {
301
302             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
303
304             if (descriptor == null) {
305               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
306               // go to next element in iterator
307               continue;
308             }
309             selflinks.add(new SelfLinkDescriptor(resourceLink,
310                 SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
311
312
313           }
314         }
315       }
316     }
317   }
318
319   /**
320    * Sync entity types.
321    */
322   private void syncEntityTypes() {
323
324     while (selflinks.peek() != null) {
325
326       SelfLinkDescriptor linkDescriptor = selflinks.poll();
327       aaiWorkOnHand.decrementAndGet();
328
329       OxmEntityDescriptor descriptor = null;
330
331       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
332
333         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
334
335         if (descriptor == null) {
336           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
337           // go to next element in iterator
338           continue;
339         }
340
341         NetworkTransaction txn = new NetworkTransaction();
342         txn.setDescriptor(descriptor);
343         txn.setLink(linkDescriptor.getSelfLink());
344         txn.setOperationType(HttpMethod.GET);
345         txn.setEntityType(linkDescriptor.getEntityType());
346
347         aaiWorkOnHand.incrementAndGet();
348
349         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
350             .whenComplete((result, error) -> {
351
352               aaiWorkOnHand.decrementAndGet();
353
354               if (error != null) {
355                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
356               } else {
357                 if (result == null) {
358                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
359                       linkDescriptor.getSelfLink());
360                 } else {
361                   updateActiveInventoryCounters(result);
362                   fetchDocumentForUpsert(result);
363                 }
364               }
365             });
366       }
367
368     }
369
370   }
371
372   /*
373    * Return a set of valid suggestion attributes for the provided entityName that are present in the
374    * JSON
375    * 
376    * @param node JSON node in which the attributes should be found
377    * 
378    * @param entityName Name of the entity
379    * 
380    * @return List of all valid suggestion attributes(key's)
381    */
382   public List<String> getSuggestableAttrNamesFromReponse(JsonNode node, String entityName) {
383     List<String> suggestableAttr = new ArrayList<String>();
384
385     HashMap<String, String> desc =
386         suggestionEntityLookup.getSuggestionSearchEntityOxmModel().get(entityName);
387
388     if (desc != null) {
389
390       String attr = desc.get("suggestibleAttributes");
391
392       if (attr != null) {
393         suggestableAttr = Arrays.asList(attr.split(","));
394         List<String> suggestableValue = new ArrayList<String>();
395         for (String attribute : suggestableAttr) {
396           if (node.get(attribute) != null && node.get(attribute).asText().length() > 0) {
397             suggestableValue.add(attribute);
398           }
399         }
400         return suggestableValue;
401       }
402     }
403
404     return new ArrayList<String>();
405   }
406
407   /**
408    * Fetch all the documents for upsert. Based on the number of permutations that are available the
409    * number of documents will be different
410    *
411    * @param txn the txn
412    */
413   private void fetchDocumentForUpsert(NetworkTransaction txn) {
414     if (!txn.getOperationResult().wasSuccessful()) {
415       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
416       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
417       return;
418     }
419     try {
420       final String jsonResult = txn.getOperationResult().getResult();
421
422       if (jsonResult != null && jsonResult.length() > 0) {
423
424         // Step 1: Calculate the number of possible permutations of attributes
425         String entityName = txn.getDescriptor().getEntityName();
426         JsonNode entityNode = mapper.readTree(jsonResult);
427
428         List<String> availableSuggestableAttrName =
429             getSuggestableAttrNamesFromReponse(entityNode, entityName);
430         
431         ArrayList<ArrayList<String>> uniqueLists =
432             SuggestionsPermutation.getNonEmptyUniqueLists(availableSuggestableAttrName);
433         // Now we have a list of all possible permutations for the status that are
434         // defined for this entity type. Try inserting a document for every combination.
435         for (ArrayList<String> uniqueList : uniqueLists) {
436
437           SuggestionSearchEntity sse = new SuggestionSearchEntity(filtersConfig, suggestionEntityLookup);
438           sse.setSuggestableAttr(uniqueList);
439           sse.setFilterBasedPayloadFromResponse(entityNode, entityName, uniqueList);
440           sse.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
441           populateSuggestionSearchEntityDocument(sse, jsonResult, txn);
442           // The unique id for the document will be created at derive fields
443           sse.deriveFields();
444           // Insert the document only if it has valid statuses
445           if (sse.isSuggestableDoc()) {
446             String link = null;
447             try {
448               link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
449             } catch (Exception exc) {
450               LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
451             }
452
453             if (link != null) {
454               NetworkTransaction n2 = new NetworkTransaction();
455               n2.setLink(link);
456               n2.setEntityType(txn.getEntityType());
457               n2.setDescriptor(txn.getDescriptor());
458               n2.setOperationType(HttpMethod.GET);
459
460               esWorkOnHand.incrementAndGet();
461
462               supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
463                   .whenComplete((result, error) -> {
464
465                     esWorkOnHand.decrementAndGet();
466
467                     if (error != null) {
468                       LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
469                     } else {
470                       updateElasticSearchCounters(result);
471                       performDocumentUpsert(result, sse);
472                     }
473                   });
474             }
475           }
476         }
477       }
478     } catch (JsonProcessingException exc) {
479         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a json processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
480     } catch (IOException exc) {
481         LOG.error(AaiUiMsgs.ERROR_GENERIC, "There was a io processing error while processing the result from elasticsearch. Error: " + exc.getMessage());
482     }
483   }
484
485   protected void populateSuggestionSearchEntityDocument(SuggestionSearchEntity sse, String result,
486       NetworkTransaction txn) throws JsonProcessingException, IOException {
487
488     OxmEntityDescriptor resultDescriptor = txn.getDescriptor();
489
490     sse.setEntityType(resultDescriptor.getEntityName());
491
492     JsonNode entityNode = mapper.readTree(result);
493
494     List<String> primaryKeyValues = new ArrayList<String>();
495     String pkeyValue = null;
496
497     for (String keyName : resultDescriptor.getPrimaryKeyAttributeNames()) {
498       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
499       if (pkeyValue != null) {
500         primaryKeyValues.add(pkeyValue);
501       } else {
502         String message = "populateSuggestionSearchEntityDocument(),"
503             + " pKeyValue is null for entityType = " + resultDescriptor.getEntityName();
504         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
505       }
506     }
507
508     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
509     sse.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
510     sse.generateSuggestionInputPermutations();
511   }
512
513   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SuggestionSearchEntity sse) {
514     /**
515      * <p>
516      * <ul>
517      * As part of the response processing we need to do the following:
518      * <li>1. Extract the version (if present), it will be the ETAG when we use the
519      * Search-Abstraction-Service
520      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
521      * tag
522      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
523      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
524      * </ul>
525      * </p>
526      */
527     String link = null;
528     try {
529       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sse.getId());
530     } catch (Exception exc) {
531       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
532       return;
533     }
534
535     boolean wasEntryDiscovered = false;
536     if (esGetTxn.getOperationResult().getResultCode() == 404) {
537       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, sse.getEntityPrimaryKeyValue());
538     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
539       wasEntryDiscovered = true;
540     } else {
541       /*
542        * Not being a 200 does not mean a failure. eg 201 is returned for created. and 500 for es not
543        * found TODO -> Should we return.
544        */
545       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
546           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
547       return;
548     }
549     // Insert a new document only if the paylod is different.
550     // This is determined by hashing the payload and using it as a id for the document
551     //
552     if (!wasEntryDiscovered) {
553       try {
554         String jsonPayload = null;
555
556         jsonPayload = sse.getAsJson();
557         if (link != null && jsonPayload != null) {
558
559           NetworkTransaction updateElasticTxn = new NetworkTransaction();
560           updateElasticTxn.setLink(link);
561           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
562           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
563           updateElasticTxn.setOperationType(HttpMethod.PUT);
564
565           esWorkOnHand.incrementAndGet();
566           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
567               esPutExecutor).whenComplete((result, error) -> {
568
569                 esWorkOnHand.decrementAndGet();
570
571                 if (error != null) {
572                   String message = "Suggestion search entity sync UPDATE PUT error - "
573                       + error.getLocalizedMessage();
574                   LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
575                 } else {
576                   updateElasticSearchCounters(result);
577                   processStoreDocumentResult(result, esGetTxn, sse);
578                 }
579               });
580         }
581       } catch (Exception exc) {
582         String message =
583             "Exception caught during suggestion search entity sync PUT operation. Message - "
584                 + exc.getLocalizedMessage();
585         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
586       }
587     }
588   }
589
590   private void processStoreDocumentResult(NetworkTransaction esPutResult,
591       NetworkTransaction esGetResult, SuggestionSearchEntity sse) {
592
593     OperationResult or = esPutResult.getOperationResult();
594
595     if (!or.wasSuccessful()) {
596       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
597
598         if (shouldAllowRetry(sse.getId())) {
599           esWorkOnHand.incrementAndGet();
600
601           RetrySuggestionEntitySyncContainer rssec =
602               new RetrySuggestionEntitySyncContainer(esGetResult, sse);
603           retryQueue.push(rssec);
604
605           String message = "Store document failed during suggestion search entity synchronization"
606               + " due to version conflict. Entity will be re-synced.";
607           LOG.warn(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
608         }
609       } else {
610         String message =
611             "Store document failed during suggestion search entity synchronization with result code "
612                 + or.getResultCode() + " and result message " + or.getResult();
613         LOG.error(AaiUiMsgs.ES_SUGGESTION_SEARCH_ENTITY_SYNC_ERROR, message);
614       }
615     }
616   }
617
618   /**
619    * Perform retry sync.
620    */
621   private void performRetrySync() {
622     while (retryQueue.peek() != null) {
623
624       RetrySuggestionEntitySyncContainer susc = retryQueue.poll();
625       if (susc != null) {
626
627         SuggestionSearchEntity sus = susc.getSuggestionSearchEntity();
628         NetworkTransaction txn = susc.getNetworkTransaction();
629
630         String link = null;
631         try {
632           /*
633            * In this retry flow the se object has already derived its fields
634            */
635           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), sus.getId());
636         } catch (Exception exc) {
637           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
638         }
639
640         if (link != null) {
641           NetworkTransaction retryTransaction = new NetworkTransaction();
642           retryTransaction.setLink(link);
643           retryTransaction.setEntityType(txn.getEntityType());
644           retryTransaction.setDescriptor(txn.getDescriptor());
645           retryTransaction.setOperationType(HttpMethod.GET);
646
647           /*
648            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
649            * called incrementAndGet when queuing the failed PUT!
650            */
651
652           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
653               esExecutor).whenComplete((result, error) -> {
654
655                 esWorkOnHand.decrementAndGet();
656
657                 if (error != null) {
658                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
659                 } else {
660                   updateElasticSearchCounters(result);
661                   performDocumentUpsert(result, sus);
662                 }
663               });
664         }
665
666       }
667     }
668   }
669
670   /**
671    * Should allow retry.
672    *
673    * @param id the id
674    * @return true, if successful
675    */
676   private boolean shouldAllowRetry(String id) {
677     boolean isRetryAllowed = true;
678     if (retryLimitTracker.get(id) != null) {
679       Integer currentCount = retryLimitTracker.get(id);
680       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
681         isRetryAllowed = false;
682         String message = "Searchable entity re-sync limit reached for " + id
683             + ", re-sync will no longer be attempted for this entity";
684         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
685       } else {
686         Integer newCount = new Integer(currentCount.intValue() + 1);
687         retryLimitTracker.put(id, newCount);
688       }
689     } else {
690       Integer firstRetryCount = new Integer(1);
691       retryLimitTracker.put(id, firstRetryCount);
692     }
693
694     return isRetryAllowed;
695   }
696
697
698
699   @Override
700   public SynchronizerState getState() {
701
702     if (!isSyncDone()) {
703       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
704     }
705
706     return SynchronizerState.IDLE;
707
708   }
709
710   /*
711    * (non-Javadoc)
712    * 
713    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
714    */
715   @Override
716   public String getStatReport(boolean showFinalReport) {
717     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
718     return getStatReport(syncDurationInMs, showFinalReport);
719   }
720
721   /*
722    * (non-Javadoc)
723    * 
724    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
725    */
726   @Override
727   public void shutdown() {
728     this.shutdownExecutors();
729   }
730
731   @Override
732   protected boolean isSyncDone() {
733
734     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
735
736     if (LOG.isDebugEnabled()) {
737       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
738           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
739     }
740
741     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
742       return false;
743     }
744
745     this.syncInProgress = false;
746
747     return true;
748   }
749
750   /*
751    * (non-Javadoc)
752    * 
753    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
754    */
755   @Override
756   public void clearCache() {
757
758     if (syncInProgress) {
759       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
760           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
761       return;
762     }
763
764     super.clearCache();
765     this.resetCounters();
766     if (entityCounters != null) {
767       entityCounters.clear();
768     }
769
770     allWorkEnumerated = false;
771
772   }
773
774 }