Fix sonar issues :
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.sparky.aggregation.sync;
22
23 import static java.util.concurrent.CompletableFuture.supplyAsync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Deque;
28 import java.util.EnumSet;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentLinkedDeque;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.function.Consumer;
36 import java.util.function.Supplier;
37
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.aai.cl.mdc.MdcContext;
41 import org.onap.aai.restclient.client.OperationResult;
42 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
43 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
44 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
45 import org.onap.aai.sparky.dal.NetworkTransaction;
46 import org.onap.aai.sparky.dal.rest.HttpMethod;
47 import org.onap.aai.sparky.logging.AaiUiMsgs;
48 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
49 import org.onap.aai.sparky.sync.IndexSynchronizer;
50 import org.onap.aai.sparky.sync.SynchronizerConstants;
51 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
52 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
53 import org.onap.aai.sparky.sync.entity.AggregationEntity;
54 import org.onap.aai.sparky.sync.entity.MergableEntity;
55 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
56 import org.onap.aai.sparky.sync.enumeration.OperationState;
57 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
58 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
59 import org.onap.aai.sparky.sync.task.PerformSearchServicePut;
60 import org.onap.aai.sparky.sync.task.PerformSearchServiceRetrieval;
61 import org.onap.aai.sparky.sync.task.PerformSearchServiceUpdate;
62 import org.onap.aai.sparky.util.NodeUtils;
63 import org.slf4j.MDC;
64
65 import com.fasterxml.jackson.core.JsonProcessingException;
66 import com.fasterxml.jackson.databind.JsonNode;
67 import com.fasterxml.jackson.databind.ObjectReader;
68 import com.fasterxml.jackson.databind.node.ArrayNode;
69
70 /**
71  * The Class AggregationSynchronizer.
72  */
73 public class AggregationSynchronizer extends AbstractEntitySynchronizer
74     implements IndexSynchronizer {
75
76   /**
77    * The Class RetryAggregationEntitySyncContainer.
78    */
79   private class RetryAggregationEntitySyncContainer {
80     NetworkTransaction txn;
81     AggregationEntity ae;
82
83     /**
84      * Instantiates a new retry aggregation entity sync container.
85      *
86      * @param txn the txn
87      * @param ae the se
88      */
89     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
90       this.txn = txn;
91       this.ae = ae;
92     }
93
94     public NetworkTransaction getNetworkTransaction() {
95       return txn;
96     }
97
98     public AggregationEntity getAggregationEntity() {
99       return ae;
100     }
101   }
102   
103   private static final Logger LOG =
104       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
105   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
106
107   private boolean allWorkEnumerated;
108   private Deque<SelfLinkDescriptor> selflinks;
109   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
110   private Map<String, Integer> retryLimitTracker;
111   protected ExecutorService esPutExecutor;
112   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
113   private boolean syncInProgress;
114   private Map<String, String> contextMap;
115   private String entityType;
116   private ElasticSearchSchemaConfig schemaConfig;
117   private OxmEntityLookup oxmEntityLookup;
118   
119   /**
120    * Instantiates a new entity aggregation synchronizer.
121    *
122    * @throws Exception the exception
123    */
124   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
125       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
126       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
127       OxmEntityLookup oxmEntityLookup) throws Exception {
128     
129     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
130         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
131     
132     this.oxmEntityLookup = oxmEntityLookup;
133     
134     this.schemaConfig = schemaConfig;
135     this.entityType = entityType;
136     this.allWorkEnumerated = false;
137     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
138     this.synchronizerName = "Entity Aggregation Synchronizer";
139     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
140     this.syncInProgress = false;
141     this.allWorkEnumerated = false;
142     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
143     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
144     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
145     
146     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
147     
148     this.aaiEntityStats.intializeEntityCounters(entityType);
149     this.esEntityStats.intializeEntityCounters(entityType);
150         
151     this.contextMap = MDC.getCopyOfContextMap();
152   }
153
154   /**
155    * Collect all the work.
156    *
157    * @return the operation state
158    */
159   private OperationState collectAllTheWork() {
160     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
161     final String entity = this.getEntityType();
162     try {
163
164       aaiWorkOnHand.set(1);
165
166       supplyAsync(new Supplier<Void>() {
167
168         @Override
169         public Void get() {
170           MDC.setContextMap(contextMap);
171           OperationResult typeLinksResult = null;
172           try {
173             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
174             aaiWorkOnHand.decrementAndGet();
175             processEntityTypeSelfLinks(typeLinksResult);
176           } catch (Exception exc) {
177                   LOG.error(AaiUiMsgs.ERROR_GENERIC, "Processing execption while building working set.  Error:" 
178                        + exc.getMessage());
179           }
180
181           return null;
182         }
183
184       }, aaiExecutor).whenComplete((result, error) -> {
185
186         if (error != null) {
187           LOG.error(AaiUiMsgs.ERROR_GENERIC,
188               "An error occurred getting data from AAI. Error = " + error.getMessage());
189         }
190       });
191
192       while (aaiWorkOnHand.get() != 0) {
193
194         if (LOG.isDebugEnabled()) {
195           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
196         }
197
198         Thread.sleep(1000);
199       }
200
201       aaiWorkOnHand.set(selflinks.size());
202       allWorkEnumerated = true;
203       syncEntityTypes();
204
205       while (!isSyncDone()) {
206         performRetrySync();
207         Thread.sleep(1000);
208       }
209
210       /*
211        * Make sure we don't hang on to retries that failed which could cause issues during future
212        * syncs
213        */
214       retryLimitTracker.clear();
215     } catch (InterruptedException e) {
216       // Restore interrupted state...
217       Thread.currentThread().interrupt();
218     } catch (Exception exc) {
219       // TODO -> LOG, waht should be logged here?
220     }
221
222     return OperationState.OK;
223   }
224   
225   
226   /**
227    * Perform retry sync.
228    */
229   private void performRetrySync() {
230     while (retryQueue.peek() != null) {
231
232       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
233       if (rsc != null) {
234
235         AggregationEntity ae = rsc.getAggregationEntity();
236         NetworkTransaction txn = rsc.getNetworkTransaction();
237
238         final Consumer<NetworkTransaction> networkTransactionConsumer = (result) -> performDocumentUpsert(result, ae);
239         performRetrySync(ae.getId(), networkTransactionConsumer, txn);
240
241       }
242     }
243   }
244
245
246   /**
247    * Perform document upsert.
248    *
249    * @param esGetTxn the es get txn
250    * @param ae the ae
251    */
252   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
253     /**
254      * <p>
255      * <ul>
256      * As part of the response processing we need to do the following:
257      * <li>1. Extract the version (if present), it will be the ETAG when we use the
258      * Search-Abstraction-Service
259      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
260      * tag
261      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
262      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
263      * </ul>
264      * </p>
265      */
266     String link = null;
267     try {
268       link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId()); 
269     } catch (Exception exc) {
270       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
271       return;
272     }
273
274     String versionNumber = null;
275     boolean wasEntryDiscovered = false;
276     if (esGetTxn.getOperationResult().getResultCode() == 404) {
277       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
278     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
279       wasEntryDiscovered = true;
280       try {
281         versionNumber = NodeUtils.extractFieldValueFromObject(
282             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
283             "etag");
284       } catch (IOException exc) {
285         String message =
286             "Error extracting version number from response, aborting aggregation entity sync of "
287                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
288         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
289         return;
290       }
291     } else {
292       /*
293        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
294        * return.
295        */
296       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
297           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
298       return;
299     }
300
301     try {
302       String jsonPayload = null;
303       if (wasEntryDiscovered) {
304         try {
305           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
306           NodeUtils.extractObjectsByKey(
307               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
308               "content", sourceObject);
309
310           if (!sourceObject.isEmpty()) {
311             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
312             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
313             ObjectReader updater = mapper.readerForUpdating(me);
314             MergableEntity merged = updater.readValue(ae.getAsJson());
315             jsonPayload = mapper.writeValueAsString(merged);
316           }
317         } catch (IOException exc) {
318           String message =
319               "Error extracting source value from response, aborting aggregation entity sync of "
320                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
321           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
322           return;
323         }
324       } else {
325         jsonPayload = ae.getAsJson();
326       }
327
328       if (wasEntryDiscovered) {
329         if (versionNumber != null && jsonPayload != null) {
330
331                 String requestPayload =
332                           searchServiceAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
333                         versionNumber, ae.getId(),jsonPayload);
334
335           NetworkTransaction transactionTracker = new NetworkTransaction();
336           transactionTracker.setEntityType(esGetTxn.getEntityType());
337           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
338           transactionTracker.setOperationType(HttpMethod.PUT);
339
340           esWorkOnHand.incrementAndGet();
341           supplyAsync(new PerformSearchServiceUpdate(searchServiceAdapter.buildSearchServiceBulkUrl(),
342                   requestPayload, searchServiceAdapter, transactionTracker), esPutExecutor)
343           .whenComplete((result, error) -> {
344
345                     esWorkOnHand.decrementAndGet();
346
347                     if (error != null) {
348                       String message = "Aggregation entity sync UPDATE PUT error - "
349                           + error.getLocalizedMessage();
350                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
351                     } else {
352                       updateElasticSearchCounters(result);
353                       processStoreDocumentResult(result, esGetTxn, ae);
354                     }
355                   });
356         }
357
358       } else {
359         if (link != null && jsonPayload != null) {
360
361           NetworkTransaction updateElasticTxn = new NetworkTransaction();
362           updateElasticTxn.setLink(link);
363           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
364           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
365           updateElasticTxn.setOperationType(HttpMethod.PUT);
366
367           esWorkOnHand.incrementAndGet();
368           supplyAsync(new PerformSearchServicePut(jsonPayload, updateElasticTxn, searchServiceAdapter),
369                   esPutExecutor).whenComplete((result, error) -> {
370
371                 esWorkOnHand.decrementAndGet();
372
373                 if (error != null) {
374                   String message =
375                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
376                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
377                 } else {
378                   updateElasticSearchCounters(result);
379                   processStoreDocumentResult(result, esGetTxn, ae);
380                 }
381               });
382         }
383       }
384     } catch (Exception exc) {
385       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
386           + exc.getLocalizedMessage();
387       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
388     }
389   }
390   
391   /**
392    * Should allow retry.
393    *
394    * @param id the id
395    * @return true, if successful
396    */
397   private boolean shouldAllowRetry(String id) {
398     boolean isRetryAllowed = true;
399     if (retryLimitTracker.get(id) != null) {
400       Integer currentCount = retryLimitTracker.get(id);
401       if (currentCount >= RETRY_COUNT_PER_ENTITY_LIMIT) {
402         isRetryAllowed = false;
403         String message = "Aggregation entity re-sync limit reached for " + id
404             + ", re-sync will no longer be attempted for this entity";
405         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
406       } else {
407         Integer newCount = currentCount + 1;
408         retryLimitTracker.put(id, newCount);
409       }
410     } else {
411       Integer firstRetryCount = 1;
412       retryLimitTracker.put(id, firstRetryCount);
413     }
414
415     return isRetryAllowed;
416   }
417   
418   /**
419    * Process store document result.
420    *
421    * @param esPutResult the es put result
422    * @param esGetResult the es get result
423    * @param ae the ae
424    */
425   private void processStoreDocumentResult(NetworkTransaction esPutResult,
426       NetworkTransaction esGetResult, AggregationEntity ae) {
427
428     OperationResult or = esPutResult.getOperationResult();
429
430     if (!or.wasSuccessful()) {
431       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
432
433         if (shouldAllowRetry(ae.getId())) {
434           esWorkOnHand.incrementAndGet();
435
436           RetryAggregationEntitySyncContainer rsc =
437               new RetryAggregationEntitySyncContainer(esGetResult, ae);
438           retryQueue.push(rsc);
439
440           String message = "Store document failed during aggregation entity synchronization"
441               + " due to version conflict. Entity will be re-synced.";
442           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
443         }
444       } else {
445         String message =
446             "Store document failed during aggregation entity synchronization with result code "
447                 + or.getResultCode() + " and result message " + or.getResult();
448         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
449       }
450     }
451   }
452   
453   /**
454    * Sync entity types.
455    */
456   private void syncEntityTypes() {
457
458     while (selflinks.peek() != null) {
459
460       SelfLinkDescriptor linkDescriptor = selflinks.poll();
461       aaiWorkOnHand.decrementAndGet();
462
463       OxmEntityDescriptor descriptor = null;
464
465       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
466
467         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
468
469         if (descriptor == null) {
470           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
471           // go to next element in iterator
472           continue;
473         }
474
475         NetworkTransaction txn = new NetworkTransaction();
476         txn.setDescriptor(descriptor);
477         txn.setLink(linkDescriptor.getSelfLink());
478         txn.setOperationType(HttpMethod.GET);
479         txn.setEntityType(linkDescriptor.getEntityType());
480
481         aaiWorkOnHand.incrementAndGet();
482
483         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter,"sync"), aaiExecutor)
484             .whenComplete((result, error) -> {
485
486               aaiWorkOnHand.decrementAndGet();
487
488               if (error != null) {
489                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
490               } else {
491                 if (result == null) {
492                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
493                       linkDescriptor.getSelfLink());
494                 } else {
495                   updateActiveInventoryCounters(result);
496                   fetchDocumentForUpsert(result);
497                 }
498               }
499             });
500       }
501
502     }
503
504   }
505   
506   /**
507    * Fetch document for upsert.
508    *
509    * @param txn the txn
510    */
511   private void fetchDocumentForUpsert(NetworkTransaction txn) {
512     // modified
513     if (!txn.getOperationResult().wasSuccessful()) {
514       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
515       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
516       return;
517     }
518
519     try {
520         final String jsonResult = txn.getOperationResult().getResult();
521         if (jsonResult != null && jsonResult.length() > 0) {
522
523           AggregationEntity ae = new AggregationEntity();
524           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
525           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
526           ae.deriveFields();
527
528           String link = null;
529           try {
530             link = searchServiceAdapter.buildSearchServiceDocUrl(getIndexName(), ae.getId());
531           } catch (Exception exc) {
532             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
533           }
534
535           if (link != null) {
536             NetworkTransaction n2 = new NetworkTransaction();
537             n2.setLink(link);
538             n2.setEntityType(txn.getEntityType());
539             n2.setDescriptor(txn.getDescriptor());
540             n2.setOperationType(HttpMethod.GET);
541
542             esWorkOnHand.incrementAndGet();
543
544             supplyAsync(new PerformSearchServiceRetrieval(n2, searchServiceAdapter), esExecutor)
545             .whenComplete((result, error) -> {
546
547                   esWorkOnHand.decrementAndGet();
548
549                   if (error != null) {
550                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
551                   } else {
552                     updateElasticSearchCounters(result);
553                     performDocumentUpsert(result, ae);
554                   }
555                 });
556           }
557         }
558
559                 } catch (JsonProcessingException exc) {
560                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
561                                         "There was a JSON processing error fetching the elastic document for upsert.  Error: "
562                                                         + exc.getMessage());
563                 } catch (IOException exc) {
564                         LOG.error(AaiUiMsgs.ERROR_GENERIC,
565                                         "There was an IO error fetching the elastic document for upsert.  Error: " + exc.getMessage());
566                 }
567           }
568   
569   
570   /**
571    * Populate aggregation entity document.
572    *
573    * @param doc the doc
574    * @param result the result
575    * @param resultDescriptor the result descriptor
576    * @throws JsonProcessingException the json processing exception
577    * @throws IOException Signals that an I/O exception has occurred.
578    */
579   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
580       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
581     doc.setEntityType(resultDescriptor.getEntityName());
582     JsonNode entityNode = mapper.readTree(result);
583     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
584     doc.copyAttributeKeyValuePair(map);
585   }
586
587   /**
588    * Process entity type self links.
589    *
590    * @param operationResult the operation result
591    */
592   private void processEntityTypeSelfLinks(OperationResult operationResult) {
593
594     if (operationResult == null) {
595       return;
596     }
597
598     final String jsonResult = operationResult.getResult();
599
600     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
601
602       try {
603         JsonNode rootNode = mapper.readTree(jsonResult);
604
605         JsonNode resultData = rootNode.get("result-data");
606
607         if (resultData.isArray()) {
608           ArrayNode resultDataArrayNode = (ArrayNode) resultData;
609
610           Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
611
612           while (elementIterator.hasNext()) {
613             JsonNode element = elementIterator.next();
614
615             final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
616             final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
617
618             if (resourceType != null && resourceLink != null) {
619
620               OxmEntityDescriptor descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
621
622               if (descriptor == null) {
623                 LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
624                 // go to next element in iterator
625                 continue;
626               }
627
628               selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
629
630             }
631           }
632         }
633       } catch (IOException exc) {
634         String message =
635                 "Could not deserialize JSON (representing operation result) as node tree. " +
636                         "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
637         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
638       }
639     }
640
641   }
642
643   /*
644    * (non-Javadoc)
645    * 
646    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
647    */
648   @Override
649   public OperationState doSync() {
650     this.syncDurationInMs = -1;
651     syncStartedTimeStampInMs = System.currentTimeMillis();
652     String txnID = NodeUtils.getRandomTxnId();
653     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
654     
655     return collectAllTheWork();
656   }
657
658   @Override
659   public SynchronizerState getState() {
660
661     if (!isSyncDone()) {
662       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
663     }
664
665     return SynchronizerState.IDLE;
666
667   }
668
669   /*
670    * (non-Javadoc)
671    * 
672    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
673    */
674   @Override
675   public String getStatReport(boolean showFinalReport) {
676     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
677     return getStatReport(syncDurationInMs, showFinalReport);
678   }
679
680   public String getEntityType() {
681     return entityType;
682   }
683
684   public void setEntityType(String entityType) {
685     this.entityType = entityType;
686   }
687
688   /*
689    * (non-Javadoc)
690    * 
691    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
692    */
693   @Override
694   public void shutdown() {
695     this.shutdownExecutors();
696   }
697
698   @Override
699   protected boolean isSyncDone() {
700
701     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
702
703     if (LOG.isDebugEnabled()) {
704       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
705           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
706     }
707
708     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
709       return false;
710     }
711
712     this.syncInProgress = false;
713
714     return true;
715   }
716
717   /*
718    * (non-Javadoc)
719    * 
720    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
721    */
722   @Override
723   public void clearCache() {
724
725     if (syncInProgress) {
726       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
727           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
728       return;
729     }
730
731     super.clearCache();
732     this.resetCounters();
733     if (entityCounters != null) {
734       entityCounters.clear();
735     }
736
737     allWorkEnumerated = false;
738
739   }
740
741 }