update sync queries to use searh data service
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / viewinspect / sync / ViewInspectEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.viewinspect.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.ExecutorService;
35 import java.util.function.Consumer;
36 import java.util.function.Supplier;
37
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.aai.cl.mdc.MdcContext;
41 import org.onap.aai.restclient.client.OperationResult;
42 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
44 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
45 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
46 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
47 import org.onap.aai.sparky.dal.NetworkTransaction;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.SynchronizerConstants;
53 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
54 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
55 import org.onap.aai.sparky.sync.entity.MergableEntity;
56 import org.onap.aai.sparky.sync.entity.SearchableEntity;
57 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
58 import org.onap.aai.sparky.sync.enumeration.OperationState;
59 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
60 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
61 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
62 import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
63 import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
64 import org.onap.aai.sparky.util.NodeUtils;
65 import org.slf4j.MDC;
66
67 import com.fasterxml.jackson.core.JsonProcessingException;
68 import com.fasterxml.jackson.databind.JsonNode;
69 import com.fasterxml.jackson.databind.ObjectReader;
70 import com.fasterxml.jackson.databind.node.ArrayNode;
71
72 /**
73  * The Class SearchableEntitySynchronizer.
74  */
75 public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
76     implements IndexSynchronizer {
77
78   /**
79    * The Class RetrySearchableEntitySyncContainer.
80    */
81   private class RetrySearchableEntitySyncContainer {
82     NetworkTransaction txn;
83     SearchableEntity se;
84
85     /**
86      * Instantiates a new retry searchable entity sync container.
87      *
88      * @param txn the txn
89      * @param se the se
90      */
91     public RetrySearchableEntitySyncContainer(NetworkTransaction txn, SearchableEntity se) {
92       this.txn = txn;
93       this.se = se;
94     }
95
96     public NetworkTransaction getNetworkTransaction() {
97       return txn;
98     }
99
100     public SearchableEntity getSearchableEntity() {
101       return se;
102     }
103   }
104
105   private static final Logger LOG =
106       LoggerFactory.getInstance().getLogger(ViewInspectEntitySynchronizer.class);
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private Deque<RetrySearchableEntitySyncContainer> retryQueue;
111   private Map<String, Integer> retryLimitTracker;
112   protected ExecutorService esPutExecutor;
113   private OxmEntityLookup oxmEntityLookup;
114   private SearchableEntityLookup searchableEntityLookup;
115
116   /**
117    * Instantiates a new searchable entity synchronizer.
118    *
119    * @throws Exception the exception
120    */
121   public ViewInspectEntitySynchronizer(ElasticSearchSchemaConfig schemaConfig,
122       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
123       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
124       SearchableEntityLookup searchableEntityLookup) throws Exception {
125     super(LOG, "SES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
126         aaiStatConfig, esStatConfig);
127     
128     this.oxmEntityLookup = oxmEntityLookup;
129     this.searchableEntityLookup = searchableEntityLookup;
130     this.allWorkEnumerated = false;
131     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
132     this.retryQueue = new ConcurrentLinkedDeque<RetrySearchableEntitySyncContainer>();
133     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
134     this.synchronizerName = "Searchable Entity Synchronizer";
135     this.esPutExecutor = NodeUtils.createNamedExecutor("SES-ES-PUT", 5, LOG);
136     this.aaiEntityStats.intializeEntityCounters(
137         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
138     this.esEntityStats.intializeEntityCounters(
139         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
140     this.syncDurationInMs = -1;
141   }
142
143   /**
144    * Collect all the work.
145    *
146    * @return the operation state
147    */
148   private OperationState collectAllTheWork() {
149     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
150     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
151         searchableEntityLookup.getSearchableEntityDescriptors();
152     
153     if (descriptorMap.isEmpty()) {
154       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
155       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
156       return OperationState.ERROR;
157     }
158
159     Collection<String> syncTypes = descriptorMap.keySet();
160
161     /*Collection<String> syncTypes = new ArrayList<String>();
162     syncTypes.add("service-instance");*/
163
164     try {
165
166       /*
167        * launch a parallel async thread to process the documents for each entity-type (to max the
168        * of the configured executor anyway)
169        */
170
171       aaiWorkOnHand.set(syncTypes.size());
172
173       for (String key : syncTypes) {
174
175         supplyAsync(new Supplier<Void>() {
176
177           @Override
178           public Void get() {
179             MDC.setContextMap(contextMap);
180             OperationResult typeLinksResult = null;
181             try {
182               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
183               System.out.println(typeLinksResult);
184               aaiWorkOnHand.decrementAndGet();
185               processEntityTypeSelfLinks(typeLinksResult);
186             } catch (Exception exc) {
187               // TODO -> LOG, what should be logged here?
188             }
189
190             return null;
191           }
192
193         }, aaiExecutor).whenComplete((result, error) -> {
194
195           if (error != null) {
196             LOG.error(AaiUiMsgs.ERROR_GENERIC,
197                 "An error occurred getting data from AAI. Error = " + error.getMessage());
198           }
199         });
200
201       }
202
203       while (aaiWorkOnHand.get() != 0) {
204
205         if (LOG.isDebugEnabled()) {
206           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
207         }
208
209         Thread.sleep(1000);
210       }
211
212       aaiWorkOnHand.set(selflinks.size());
213       allWorkEnumerated = true;
214       syncEntityTypes();
215
216       while (!isSyncDone()) {
217         performRetrySync();
218         Thread.sleep(1000);
219       }
220
221       /*
222        * Make sure we don't hang on to retries that failed which could cause issues during future
223        * syncs
224        */
225       retryLimitTracker.clear();
226
227     } catch (Exception exc) {
228       // TODO -> LOG, waht should be logged here?
229     }
230
231     return OperationState.OK;
232   }
233
234   /* (non-Javadoc)
235    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
236    */
237   @Override
238   public OperationState doSync() {
239     this.syncDurationInMs = -1;
240     String txnID = NodeUtils.getRandomTxnId();
241     MdcContext.initialize(txnID, "SearchableEntitySynchronizer", "", "Sync", "");
242     
243     resetCounters();
244     this.allWorkEnumerated = false;
245     syncStartedTimeStampInMs = System.currentTimeMillis();
246     collectAllTheWork();
247
248     return OperationState.OK;
249   }
250
251   /**
252    * Process entity type self links.
253    *
254    * @param operationResult the operation result
255    */
256   private void processEntityTypeSelfLinks(OperationResult operationResult) {
257
258     final String jsonResult = operationResult.getResult();
259
260     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
261
262       try {
263         JsonNode rootNode = mapper.readTree(jsonResult);
264         JsonNode resultData = rootNode.get("result-data");
265
266         if (resultData.isArray()) {
267           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
268
269           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
270
271           while (elementIterator.hasNext()) {
272             JsonNode element = elementIterator.next();
273
274             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
275             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
276
277             SearchableOxmEntityDescriptor descriptor = null;
278
279             if (resourceType != null && resourceLink != null) {
280
281               descriptor = searchableEntityLookup.getSearchableEntityDescriptors().get(resourceType);
282
283               if (descriptor == null) {
284                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
285                 // go to next element in iterator
286                 continue;
287               }
288
289               if (descriptor.hasSearchableAttributes()) {
290                 selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
291               }
292
293             }
294           }
295         }
296       } catch (IOException exc) {
297         String message =
298             "Could not deserialize JSON (representing operation result) as node tree. " +
299             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
300         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
301       }
302     }
303
304   }
305
306   /**
307    * Sync entity types.
308    */
309   private void syncEntityTypes() {
310
311     while (selflinks.peek() != null) {
312
313       SelfLinkDescriptor linkDescriptor = selflinks.poll();
314       aaiWorkOnHand.decrementAndGet();
315
316       OxmEntityDescriptor descriptor = null;
317
318       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
319
320         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
321
322         if (descriptor == null) {
323           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
324           // go to next element in iterator
325           continue;
326         }
327
328         NetworkTransaction txn = new NetworkTransaction();
329         txn.setDescriptor(descriptor);
330         txn.setLink(linkDescriptor.getSelfLink());
331         txn.setOperationType(HttpMethod.GET);
332         txn.setEntityType(linkDescriptor.getEntityType());
333
334         aaiWorkOnHand.incrementAndGet();
335
336         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
337             .whenComplete((result, error) -> {
338
339               aaiWorkOnHand.decrementAndGet();
340
341               if (error != null) {
342                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
343               } else {
344                 if (result == null) {
345                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
346                       linkDescriptor.getSelfLink());
347                 } else {
348                   updateActiveInventoryCounters(result);
349                   fetchDocumentForUpsert(result);
350                 }
351               }
352             });
353       }
354
355     }
356
357   }
358
359   /**
360    * Perform document upsert.
361    *
362    * @param esGetTxn the es get txn
363    * @param se the se
364    */
365   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SearchableEntity se) {
366     /**
367      * <p>
368      * <ul>
369      * As part of the response processing we need to do the following:
370      * <li>1. Extract the version (if present), it will be the ETAG when we use the
371      * Search-Abstraction-Service
372      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
373      * tag
374      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
375      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
376      * </ul>
377      * </p>
378      */
379     String link = null;
380     try {
381       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
382     } catch (Exception exc) {
383       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
384       return;
385     }
386
387     String versionNumber = null;
388     boolean wasEntryDiscovered = false;
389     if (esGetTxn.getOperationResult().getResultCode() == 404) {
390       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, se.getEntityPrimaryKeyValue());
391     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
392       wasEntryDiscovered = true;
393       try {
394         versionNumber = NodeUtils.extractFieldValueFromObject(
395             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
396             "_version");
397       } catch (IOException exc) {
398         String message =
399             "Error extracting version number from response, aborting searchable entity sync of "
400                 + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
401         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
402         return;
403       }
404     } else {
405       /*
406        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
407        * return.
408        */
409       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
410           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
411       return;
412     }
413
414     try {
415       String jsonPayload = null;
416       if (wasEntryDiscovered) {
417         try {
418           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
419           NodeUtils.extractObjectsByKey(
420               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
421               "_source", sourceObject);
422
423           if (!sourceObject.isEmpty()) {
424             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
425             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
426             ObjectReader updater = mapper.readerForUpdating(me);
427             MergableEntity merged = updater.readValue(NodeUtils.convertObjectToJson(se,false));
428             jsonPayload = mapper.writeValueAsString(merged);
429           }
430         } catch (IOException exc) {
431           String message =
432               "Error extracting source value from response, aborting searchable entity sync of "
433                   + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
434           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
435           return;
436         }
437       } else {
438           jsonPayload = se.getAsJson();
439       }
440
441       if (wasEntryDiscovered) {
442         if (versionNumber != null && jsonPayload != null) {
443
444                 String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
445                     se.getId(), jsonPayload);
446
447           NetworkTransaction transactionTracker = new NetworkTransaction();
448           transactionTracker.setEntityType(esGetTxn.getEntityType());
449           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
450           transactionTracker.setOperationType(HttpMethod.PUT);
451
452           esWorkOnHand.incrementAndGet();
453           supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
454                   requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
455                       .whenComplete((result, error) -> {
456
457                     esWorkOnHand.decrementAndGet();
458
459                     if (error != null) {
460                       String message = "Searchable entity sync UPDATE PUT error - "
461                           + error.getLocalizedMessage();
462                       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
463                     } else {
464                       updateElasticSearchCounters(result);
465                       processStoreDocumentResult(result, esGetTxn, se);
466                     }
467                   });
468         }
469
470       } else {
471         
472         if (link != null && jsonPayload != null) {
473
474           NetworkTransaction updateElasticTxn = new NetworkTransaction();
475           updateElasticTxn.setLink(link);
476           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
477           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
478           updateElasticTxn.setOperationType(HttpMethod.PUT);
479
480           esWorkOnHand.incrementAndGet();
481           supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
482                   esPutExecutor).whenComplete((result, error) -> {
483
484                 esWorkOnHand.decrementAndGet();
485
486                 if (error != null) {
487                   String message =
488                       "Searchable entity sync UPDATE PUT error - " + error.getLocalizedMessage();
489                   LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
490                 } else {
491                   updateElasticSearchCounters(result);
492                   processStoreDocumentResult(result, esGetTxn, se);
493                 }
494               });
495         }
496       }
497     } catch (Exception exc) {
498       String message = "Exception caught during searchable entity sync PUT operation. Message - "
499           + exc.getLocalizedMessage();
500       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
501     }
502   }
503
504   /**
505    * Populate searchable entity document.
506    *
507    * @param doc the doc
508    * @param result the result
509    * @param resultDescriptor the result descriptor
510    * @throws JsonProcessingException the json processing exception
511    * @throws IOException Signals that an I/O exception has occurred.
512    */
513   protected void populateSearchableEntityDocument(SearchableEntity doc, String result,
514       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
515
516     doc.setEntityType(resultDescriptor.getEntityName());
517
518     JsonNode entityNode = mapper.readTree(result);
519
520     List<String> primaryKeyValues = new ArrayList<String>();
521     String pkeyValue = null;
522
523     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup.getSearchableEntityDescriptors().get(resultDescriptor.getEntityName());
524     
525     for (String keyName : searchableDescriptor.getPrimaryKeyAttributeNames()) {
526       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
527       if (pkeyValue != null) {
528         primaryKeyValues.add(pkeyValue);
529       } else {
530         String message = "populateSearchableEntityDocument(), pKeyValue is null for entityType = "
531             + resultDescriptor.getEntityName();
532         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
533       }
534     }
535
536     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
537     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
538
539     final List<String> searchTagFields = searchableDescriptor.getSearchableAttributes();
540
541     /*
542      * Based on configuration, use the configured field names for this entity-Type to build a
543      * multi-value collection of search tags for elastic search entity search criteria.
544      */
545     for (String searchTagField : searchTagFields) {
546       String searchTagValue = NodeUtils.getNodeFieldAsText(entityNode, searchTagField);
547       if (searchTagValue != null && !searchTagValue.isEmpty()) {
548         doc.addSearchTagWithKey(searchTagValue, searchTagField);
549       }
550     }
551   }
552
553   /**
554    * Fetch document for upsert.
555    *
556    * @param txn the txn
557    */
558   private void fetchDocumentForUpsert(NetworkTransaction txn) {
559     if (!txn.getOperationResult().wasSuccessful()) {
560       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
561       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
562       return;
563     }
564
565     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup
566         .getSearchableEntityDescriptors().get(txn.getDescriptor().getEntityName());
567     
568     try {
569       if (searchableDescriptor.hasSearchableAttributes()) {
570
571         final String jsonResult = txn.getOperationResult().getResult();
572         if (jsonResult != null && jsonResult.length() > 0) {
573
574           SearchableEntity se = new SearchableEntity();
575           se.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
576           populateSearchableEntityDocument(se, jsonResult, txn.getDescriptor());
577           se.deriveFields();
578
579           String link = null;
580           try {
581             link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
582           } catch (Exception exc) {
583             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
584           }
585
586           if (link != null) {
587             NetworkTransaction n2 = new NetworkTransaction();
588             n2.setLink(link);
589             n2.setEntityType(txn.getEntityType());
590             n2.setDescriptor(txn.getDescriptor());
591             n2.setOperationType(HttpMethod.GET);
592
593             esWorkOnHand.incrementAndGet();
594
595             supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
596             .whenComplete((result, error) -> {
597
598                   esWorkOnHand.decrementAndGet();
599
600                   if (error != null) {
601                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
602                   } else {
603                     updateElasticSearchCounters(result);
604                     performDocumentUpsert(result, se);
605                   }
606                 });
607           }
608         }
609
610       }
611     } catch (JsonProcessingException exc) {
612       // TODO -> LOG, waht should be logged here?
613     } catch (IOException exc) {
614       // TODO -> LOG, waht should be logged here?
615     }
616   }
617
618   /**
619    * Process store document result.
620    *
621    * @param esPutResult the es put result
622    * @param esGetResult the es get result
623    * @param se the se
624    */
625   private void processStoreDocumentResult(NetworkTransaction esPutResult,
626       NetworkTransaction esGetResult, SearchableEntity se) {
627
628     OperationResult or = esPutResult.getOperationResult();
629
630     if (!or.wasSuccessful()) {
631       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
632
633         if (shouldAllowRetry(se.getId())) {
634           esWorkOnHand.incrementAndGet();
635
636           RetrySearchableEntitySyncContainer rsc =
637               new RetrySearchableEntitySyncContainer(esGetResult, se);
638           retryQueue.push(rsc);
639
640           String message = "Store document failed during searchable entity synchronization"
641               + " due to version conflict. Entity will be re-synced.";
642           LOG.warn(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
643         }
644       } else {
645         String message =
646             "Store document failed during searchable entity synchronization with result code "
647                 + or.getResultCode() + " and result message " + or.getResult();
648         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
649       }
650     }
651   }
652
653   /**
654    * Perform retry sync.
655    */
656   private void performRetrySync() {
657     while (retryQueue.peek() != null) {
658
659       RetrySearchableEntitySyncContainer rsc = retryQueue.poll();
660       if (rsc != null) {
661
662         SearchableEntity se = rsc.getSearchableEntity();
663         NetworkTransaction txn = rsc.getNetworkTransaction();
664
665         final Consumer<NetworkTransaction> networkTransactionConsumer = (result) ->  performDocumentUpsert(result, se);
666         performRetrySync(se.getId(), networkTransactionConsumer, txn);
667
668       }
669     }
670   }
671
672   /**
673    * Should allow retry.
674    *
675    * @param id the id
676    * @return true, if successful
677    */
678   private boolean shouldAllowRetry(String id) {
679     boolean isRetryAllowed = true;
680     if (retryLimitTracker.get(id) != null) {
681       Integer currentCount = retryLimitTracker.get(id);
682       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
683         isRetryAllowed = false;
684         String message = "Searchable entity re-sync limit reached for " + id
685             + ", re-sync will no longer be attempted for this entity";
686         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
687       } else {
688         Integer newCount = new Integer(currentCount.intValue() + 1);
689         retryLimitTracker.put(id, newCount);
690       }
691     } else {
692       Integer firstRetryCount = new Integer(1);
693       retryLimitTracker.put(id, firstRetryCount);
694     }
695
696     return isRetryAllowed;
697   }
698
699   @Override
700   public SynchronizerState getState() {
701     if (!isSyncDone()) {
702       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
703     }
704
705     return SynchronizerState.IDLE;
706
707   }
708
709   /* (non-Javadoc)
710    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
711    */
712   @Override
713   public String getStatReport(boolean showFinalReport) {
714     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
715     return this.getStatReport(syncDurationInMs, showFinalReport);
716   }
717
718   /* (non-Javadoc)
719    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
720    */
721   @Override
722   public void shutdown() {
723     this.shutdownExecutors();
724   }
725
726   @Override
727   protected boolean isSyncDone() {
728     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
729
730     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
731       return false;
732     }
733
734     return true;
735   }
736
737 }