Merge "[AAI] Attempting to fix a compile run version mismatch in sparky"
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / viewinspect / sync / ViewInspectEntitySynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.viewinspect.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentLinkedDeque;
34 import java.util.concurrent.ExecutorService;
35 import java.util.function.Consumer;
36 import java.util.function.Supplier;
37
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.aai.cl.mdc.MdcContext;
41 import org.onap.aai.restclient.client.OperationResult;
42 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
44 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
45 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
46 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
47 import org.onap.aai.sparky.dal.NetworkTransaction;
48 import org.onap.aai.sparky.dal.rest.HttpMethod;
49 import org.onap.aai.sparky.logging.AaiUiMsgs;
50 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
51 import org.onap.aai.sparky.sync.IndexSynchronizer;
52 import org.onap.aai.sparky.sync.SynchronizerConstants;
53 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
54 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
55 import org.onap.aai.sparky.sync.entity.MergableEntity;
56 import org.onap.aai.sparky.sync.entity.SearchableEntity;
57 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
58 import org.onap.aai.sparky.sync.enumeration.OperationState;
59 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
60 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
61 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
62 import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
63 import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
64 import org.onap.aai.sparky.util.NodeUtils;
65 import org.slf4j.MDC;
66
67 import com.fasterxml.jackson.core.JsonProcessingException;
68 import com.fasterxml.jackson.databind.JsonNode;
69 import com.fasterxml.jackson.databind.ObjectReader;
70 import com.fasterxml.jackson.databind.node.ArrayNode;
71
72 /**
73  * The Class SearchableEntitySynchronizer.
74  */
75 public class ViewInspectEntitySynchronizer extends AbstractEntitySynchronizer
76     implements IndexSynchronizer {
77
78   /**
79    * The Class RetrySearchableEntitySyncContainer.
80    */
81   private class RetrySearchableEntitySyncContainer {
82     NetworkTransaction txn;
83     SearchableEntity se;
84
85     /**
86      * Instantiates a new retry searchable entity sync container.
87      *
88      * @param txn the txn
89      * @param se the se
90      */
91     public RetrySearchableEntitySyncContainer(NetworkTransaction txn, SearchableEntity se) {
92       this.txn = txn;
93       this.se = se;
94     }
95
96     public NetworkTransaction getNetworkTransaction() {
97       return txn;
98     }
99
100     public SearchableEntity getSearchableEntity() {
101       return se;
102     }
103   }
104
105   private static final Logger LOG =
106       LoggerFactory.getInstance().getLogger(ViewInspectEntitySynchronizer.class);
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private Deque<RetrySearchableEntitySyncContainer> retryQueue;
111   private Map<String, Integer> retryLimitTracker;
112   protected ExecutorService esPutExecutor;
113   private OxmEntityLookup oxmEntityLookup;
114   private SearchableEntityLookup searchableEntityLookup;
115
116   /**
117    * Instantiates a new searchable entity synchronizer.
118    *
119    * @throws Exception the exception
120    */
121   public ViewInspectEntitySynchronizer(ElasticSearchSchemaConfig schemaConfig,
122       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
123       NetworkStatisticsConfig esStatConfig, OxmEntityLookup oxmEntityLookup,
124       SearchableEntityLookup searchableEntityLookup) throws Exception {
125     super(LOG, "SES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(),
126         aaiStatConfig, esStatConfig);
127     
128     this.oxmEntityLookup = oxmEntityLookup;
129     this.searchableEntityLookup = searchableEntityLookup;
130     this.allWorkEnumerated = false;
131     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
132     this.retryQueue = new ConcurrentLinkedDeque<RetrySearchableEntitySyncContainer>();
133     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
134     this.synchronizerName = "Searchable Entity Synchronizer";
135     this.esPutExecutor = NodeUtils.createNamedExecutor("SES-ES-PUT", 5, LOG);
136     this.aaiEntityStats.intializeEntityCounters(
137         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
138     this.esEntityStats.intializeEntityCounters(
139         searchableEntityLookup.getSearchableEntityDescriptors().keySet());
140     this.syncDurationInMs = -1;
141   }
142
143   /**
144    * Collect all the work.
145    *
146    * @return the operation state
147    */
148   private OperationState collectAllTheWork() {
149     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
150     Map<String, SearchableOxmEntityDescriptor> descriptorMap =
151         searchableEntityLookup.getSearchableEntityDescriptors();
152     
153     if (descriptorMap.isEmpty()) {
154       LOG.error(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
155       LOG.info(AaiUiMsgs.ERROR_LOADING_OXM_SEARCHABLE_ENTITIES);
156       return OperationState.ERROR;
157     }
158
159     Collection<String> syncTypes = descriptorMap.keySet();
160
161     /*Collection<String> syncTypes = new ArrayList<String>();
162     syncTypes.add("service-instance");*/
163
164     try {
165
166       /*
167        * launch a parallel async thread to process the documents for each entity-type (to max the
168        * of the configured executor anyway)
169        */
170
171       aaiWorkOnHand.set(syncTypes.size());
172
173       for (String key : syncTypes) {
174
175         supplyAsync(new Supplier<Void>() {
176
177           @Override
178           public Void get() {
179             MDC.setContextMap(contextMap);
180             OperationResult typeLinksResult = null;
181             try {
182               typeLinksResult = aaiAdapter.getSelfLinksByEntityType(key);
183               System.out.println(typeLinksResult);
184               aaiWorkOnHand.decrementAndGet();
185               processEntityTypeSelfLinks(typeLinksResult);
186             } catch (Exception exc) {
187               // TODO -> LOG, what should be logged here?
188             }
189
190             return null;
191           }
192
193         }, aaiExecutor).whenComplete((result, error) -> {
194
195           if (error != null) {
196             LOG.error(AaiUiMsgs.ERROR_GENERIC,
197                 "An error occurred getting data from AAI. Error = " + error.getMessage());
198           }
199         });
200
201       }
202
203       while (aaiWorkOnHand.get() != 0) {
204
205         if (LOG.isDebugEnabled()) {
206           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
207         }
208
209         Thread.sleep(1000);
210       }
211
212       aaiWorkOnHand.set(selflinks.size());
213       allWorkEnumerated = true;
214       syncEntityTypes();
215
216       while (!isSyncDone()) {
217         performRetrySync();
218         Thread.sleep(1000);
219       }
220
221       /*
222        * Make sure we don't hang on to retries that failed which could cause issues during future
223        * syncs
224        */
225       retryLimitTracker.clear();
226     } catch (InterruptedException e) {
227       // Restore interrupted state...
228       Thread.currentThread().interrupt();
229     } catch (Exception exc) {
230       // TODO -> LOG, waht should be logged here?
231     }
232
233     return OperationState.OK;
234   }
235
236   /* (non-Javadoc)
237    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
238    */
239   @Override
240   public OperationState doSync() {
241     this.syncDurationInMs = -1;
242     String txnID = NodeUtils.getRandomTxnId();
243     MdcContext.initialize(txnID, "SearchableEntitySynchronizer", "", "Sync", "");
244     
245     resetCounters();
246     this.allWorkEnumerated = false;
247     syncStartedTimeStampInMs = System.currentTimeMillis();
248     collectAllTheWork();
249
250     return OperationState.OK;
251   }
252
253   /**
254    * Process entity type self links.
255    *
256    * @param operationResult the operation result
257    */
258   private void processEntityTypeSelfLinks(OperationResult operationResult) {
259
260     final String jsonResult = operationResult.getResult();
261
262     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
263
264       try {
265         JsonNode rootNode = mapper.readTree(jsonResult);
266         JsonNode resultData = rootNode.get("result-data");
267
268         if (resultData.isArray()) {
269           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
270
271           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
272
273           while (elementIterator.hasNext()) {
274             JsonNode element = elementIterator.next();
275
276             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
277             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
278
279             SearchableOxmEntityDescriptor descriptor = null;
280
281             if (resourceType != null && resourceLink != null) {
282
283               descriptor = searchableEntityLookup.getSearchableEntityDescriptors().get(resourceType);
284
285               if (descriptor == null) {
286                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
287                 // go to next element in iterator
288                 continue;
289               }
290
291               if (descriptor.hasSearchableAttributes()) {
292                 selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
293               }
294
295             }
296           }
297         }
298       } catch (IOException exc) {
299         String message =
300             "Could not deserialize JSON (representing operation result) as node tree. " +
301             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
302         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
303       }
304     }
305
306   }
307
308   /**
309    * Sync entity types.
310    */
311   private void syncEntityTypes() {
312
313     while (selflinks.peek() != null) {
314
315       SelfLinkDescriptor linkDescriptor = selflinks.poll();
316       aaiWorkOnHand.decrementAndGet();
317
318       OxmEntityDescriptor descriptor = null;
319
320       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
321
322         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
323
324         if (descriptor == null) {
325           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
326           // go to next element in iterator
327           continue;
328         }
329
330         NetworkTransaction txn = new NetworkTransaction();
331         txn.setDescriptor(descriptor);
332         txn.setLink(linkDescriptor.getSelfLink());
333         txn.setOperationType(HttpMethod.GET);
334         txn.setEntityType(linkDescriptor.getEntityType());
335
336         aaiWorkOnHand.incrementAndGet();
337
338         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
339             .whenComplete((result, error) -> {
340
341               aaiWorkOnHand.decrementAndGet();
342
343               if (error != null) {
344                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
345               } else {
346                 if (result == null) {
347                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
348                       linkDescriptor.getSelfLink());
349                 } else {
350                   updateActiveInventoryCounters(result);
351                   fetchDocumentForUpsert(result);
352                 }
353               }
354             });
355       }
356
357     }
358
359   }
360
361   /**
362    * Perform document upsert.
363    *
364    * @param esGetTxn the es get txn
365    * @param se the se
366    */
367   protected void performDocumentUpsert(NetworkTransaction esGetTxn, SearchableEntity se) {
368     /**
369      * <p>
370      * <ul>
371      * As part of the response processing we need to do the following:
372      * <li>1. Extract the version (if present), it will be the ETAG when we use the
373      * Search-Abstraction-Service
374      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
375      * tag
376      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
377      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
378      * </ul>
379      * </p>
380      */
381     String link = null;
382     try {
383       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
384     } catch (Exception exc) {
385       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
386       return;
387     }
388
389     String versionNumber = null;
390     boolean wasEntryDiscovered = false;
391     if (esGetTxn.getOperationResult().getResultCode() == 404) {
392       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, se.getEntityPrimaryKeyValue());
393     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
394       wasEntryDiscovered = true;
395       try {
396         versionNumber = NodeUtils.extractFieldValueFromObject(
397             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
398             "etag");
399       } catch (IOException exc) {
400         String message =
401             "Error extracting version number from response, aborting searchable entity sync of "
402                 + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
403         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
404         return;
405       }
406     } else {
407       /*
408        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
409        * return.
410        */
411       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
412           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
413       return;
414     }
415
416     try {
417       String jsonPayload = null;
418       if (wasEntryDiscovered) {
419         try {
420           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
421           NodeUtils.extractObjectsByKey(
422               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
423               "content", sourceObject);
424
425           if (!sourceObject.isEmpty()) {
426             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
427             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
428             ObjectReader updater = mapper.readerForUpdating(me);
429             MergableEntity merged = updater.readValue(NodeUtils.convertObjectToJson(se,false));
430             jsonPayload = mapper.writeValueAsString(merged);
431           }
432         } catch (IOException exc) {
433           String message =
434               "Error extracting source value from response, aborting searchable entity sync of "
435                   + se.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
436           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
437           return;
438         }
439       } else {
440           jsonPayload = se.getAsJson();
441       }
442
443       if (wasEntryDiscovered) {
444         if (versionNumber != null && jsonPayload != null) {
445
446                 String requestPayload = searchServiceAdapter.buildBulkImportOperationRequest(getIndexName(),
447                se.getId(), versionNumber, jsonPayload);
448
449           NetworkTransaction transactionTracker = new NetworkTransaction();
450           transactionTracker.setEntityType(esGetTxn.getEntityType());
451           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
452           transactionTracker.setOperationType(HttpMethod.PUT);
453
454           esWorkOnHand.incrementAndGet();
455           supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
456                   requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
457                       .whenComplete((result, error) -> {
458
459                     esWorkOnHand.decrementAndGet();
460
461                     if (error != null) {
462                       String message = "Searchable entity sync UPDATE PUT error - "
463                           + error.getLocalizedMessage();
464                       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
465                     } else {
466                       updateElasticSearchCounters(result);
467                       processStoreDocumentResult(result, esGetTxn, se);
468                     }
469                   });
470         }
471
472       } else {
473         
474         if (link != null && jsonPayload != null) {
475
476           NetworkTransaction updateElasticTxn = new NetworkTransaction();
477           updateElasticTxn.setLink(link);
478           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
479           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
480           updateElasticTxn.setOperationType(HttpMethod.PUT);
481
482           esWorkOnHand.incrementAndGet();
483           supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
484                   esPutExecutor).whenComplete((result, error) -> {
485
486                 esWorkOnHand.decrementAndGet();
487
488                 if (error != null) {
489                   String message =
490                       "Searchable entity sync UPDATE PUT error - " + error.getLocalizedMessage();
491                   LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
492                 } else {
493                   updateElasticSearchCounters(result);
494                   processStoreDocumentResult(result, esGetTxn, se);
495                 }
496               });
497         }
498       }
499     } catch (Exception exc) {
500       String message = "Exception caught during searchable entity sync PUT operation. Message - "
501           + exc.getLocalizedMessage();
502       LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
503     }
504   }
505
506   /**
507    * Populate searchable entity document.
508    *
509    * @param doc the doc
510    * @param result the result
511    * @param resultDescriptor the result descriptor
512    * @throws JsonProcessingException the json processing exception
513    * @throws IOException Signals that an I/O exception has occurred.
514    */
515   protected void populateSearchableEntityDocument(SearchableEntity doc, String result,
516       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
517
518     doc.setEntityType(resultDescriptor.getEntityName());
519
520     JsonNode entityNode = mapper.readTree(result);
521
522     List<String> primaryKeyValues = new ArrayList<String>();
523     String pkeyValue = null;
524
525     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup.getSearchableEntityDescriptors().get(resultDescriptor.getEntityName());
526     
527     for (String keyName : searchableDescriptor.getPrimaryKeyAttributeNames()) {
528       pkeyValue = NodeUtils.getNodeFieldAsText(entityNode, keyName);
529       if (pkeyValue != null) {
530         primaryKeyValues.add(pkeyValue);
531       } else {
532         String message = "populateSearchableEntityDocument(), pKeyValue is null for entityType = "
533             + resultDescriptor.getEntityName();
534         LOG.warn(AaiUiMsgs.WARN_GENERIC, message);
535       }
536     }
537
538     final String primaryCompositeKeyValue = NodeUtils.concatArray(primaryKeyValues, "/");
539     doc.setEntityPrimaryKeyValue(primaryCompositeKeyValue);
540
541     final List<String> searchTagFields = searchableDescriptor.getSearchableAttributes();
542
543     /*
544      * Based on configuration, use the configured field names for this entity-Type to build a
545      * multi-value collection of search tags for elastic search entity search criteria.
546      */
547     for (String searchTagField : searchTagFields) {
548       String searchTagValue = NodeUtils.getNodeFieldAsText(entityNode, searchTagField);
549       if (searchTagValue != null && !searchTagValue.isEmpty()) {
550         doc.addSearchTagWithKey(searchTagValue, searchTagField);
551       }
552     }
553   }
554
555   /**
556    * Fetch document for upsert.
557    *
558    * @param txn the txn
559    */
560   private void fetchDocumentForUpsert(NetworkTransaction txn) {
561     if (!txn.getOperationResult().wasSuccessful()) {
562       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
563       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
564       return;
565     }
566
567     SearchableOxmEntityDescriptor searchableDescriptor = searchableEntityLookup
568         .getSearchableEntityDescriptors().get(txn.getDescriptor().getEntityName());
569     
570     try {
571       if (searchableDescriptor.hasSearchableAttributes()) {
572
573         final String jsonResult = txn.getOperationResult().getResult();
574         if (jsonResult != null && jsonResult.length() > 0) {
575
576           SearchableEntity se = new SearchableEntity();
577           se.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
578           populateSearchableEntityDocument(se, jsonResult, txn.getDescriptor());
579           se.deriveFields();
580
581           String link = null;
582           try {
583             link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), se.getId());
584           } catch (Exception exc) {
585             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
586           }
587
588           if (link != null) {
589             NetworkTransaction n2 = new NetworkTransaction();
590             n2.setLink(link);
591             n2.setEntityType(txn.getEntityType());
592             n2.setDescriptor(txn.getDescriptor());
593             n2.setOperationType(HttpMethod.GET);
594
595             esWorkOnHand.incrementAndGet();
596
597             supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
598             .whenComplete((result, error) -> {
599
600                   esWorkOnHand.decrementAndGet();
601
602                   if (error != null) {
603                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
604                   } else {
605                     updateElasticSearchCounters(result);
606                     performDocumentUpsert(result, se);
607                   }
608                 });
609           }
610         }
611
612       }
613     } catch (JsonProcessingException exc) {
614       // TODO -> LOG, waht should be logged here?
615     } catch (IOException exc) {
616       // TODO -> LOG, waht should be logged here?
617     }
618   }
619
620   /**
621    * Process store document result.
622    *
623    * @param esPutResult the es put result
624    * @param esGetResult the es get result
625    * @param se the se
626    */
627   private void processStoreDocumentResult(NetworkTransaction esPutResult,
628       NetworkTransaction esGetResult, SearchableEntity se) {
629
630     OperationResult or = esPutResult.getOperationResult();
631
632     if (!or.wasSuccessful()) {
633       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
634
635         if (shouldAllowRetry(se.getId())) {
636           esWorkOnHand.incrementAndGet();
637
638           RetrySearchableEntitySyncContainer rsc =
639               new RetrySearchableEntitySyncContainer(esGetResult, se);
640           retryQueue.push(rsc);
641
642           String message = "Store document failed during searchable entity synchronization"
643               + " due to version conflict. Entity will be re-synced.";
644           LOG.warn(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
645         }
646       } else {
647         String message =
648             "Store document failed during searchable entity synchronization with result code "
649                 + or.getResultCode() + " and result message " + or.getResult();
650         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
651       }
652     }
653   }
654
655   /**
656    * Perform retry sync.
657    */
658   private void performRetrySync() {
659     while (retryQueue.peek() != null) {
660
661       RetrySearchableEntitySyncContainer rsc = retryQueue.poll();
662       if (rsc != null) {
663
664         SearchableEntity se = rsc.getSearchableEntity();
665         NetworkTransaction txn = rsc.getNetworkTransaction();
666
667         final Consumer<NetworkTransaction> networkTransactionConsumer = (result) ->  performDocumentUpsert(result, se);
668         performRetrySync(se.getId(), networkTransactionConsumer, txn);
669
670       }
671     }
672   }
673
674   /**
675    * Should allow retry.
676    *
677    * @param id the id
678    * @return true, if successful
679    */
680   private boolean shouldAllowRetry(String id) {
681     boolean isRetryAllowed = true;
682     if (retryLimitTracker.get(id) != null) {
683       Integer currentCount = retryLimitTracker.get(id);
684       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
685         isRetryAllowed = false;
686         String message = "Searchable entity re-sync limit reached for " + id
687             + ", re-sync will no longer be attempted for this entity";
688         LOG.error(AaiUiMsgs.ES_SEARCHABLE_ENTITY_SYNC_ERROR, message);
689       } else {
690         Integer newCount = new Integer(currentCount.intValue() + 1);
691         retryLimitTracker.put(id, newCount);
692       }
693     } else {
694       Integer firstRetryCount = new Integer(1);
695       retryLimitTracker.put(id, firstRetryCount);
696     }
697
698     return isRetryAllowed;
699   }
700
701   @Override
702   public SynchronizerState getState() {
703     if (!isSyncDone()) {
704       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
705     }
706
707     return SynchronizerState.IDLE;
708
709   }
710
711   /* (non-Javadoc)
712    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
713    */
714   @Override
715   public String getStatReport(boolean showFinalReport) {
716     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
717     return this.getStatReport(syncDurationInMs, showFinalReport);
718   }
719
720   /* (non-Javadoc)
721    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
722    */
723   @Override
724   public void shutdown() {
725     this.shutdownExecutors();
726   }
727
728   @Override
729   protected boolean isSyncDone() {
730     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
731
732     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
733       return false;
734     }
735
736     return true;
737   }
738
739 }