36cd8bc3e2ad64279180df8b1ac076fdecd4ede5
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / aggregation / sync / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.aggregation.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Deque;
30 import java.util.EnumSet;
31 import java.util.Iterator;
32 import java.util.Map;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import java.util.function.Supplier;
38
39 import org.onap.aai.cl.api.Logger;
40 import org.onap.aai.cl.eelf.LoggerFactory;
41 import org.onap.aai.cl.mdc.MdcContext;
42 import org.onap.aai.restclient.client.OperationResult;
43 import org.onap.aai.sparky.config.oxm.OxmEntityDescriptor;
44 import org.onap.aai.sparky.config.oxm.OxmEntityLookup;
45 import org.onap.aai.sparky.dal.ActiveInventoryAdapter;
46 import org.onap.aai.sparky.dal.NetworkTransaction;
47 import org.onap.aai.sparky.dal.rest.HttpMethod;
48 import org.onap.aai.sparky.logging.AaiUiMsgs;
49 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
50 import org.onap.aai.sparky.sync.IndexSynchronizer;
51 import org.onap.aai.sparky.sync.SynchronizerConstants;
52 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
53 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
54 import org.onap.aai.sparky.sync.entity.AggregationEntity;
55 import org.onap.aai.sparky.sync.entity.MergableEntity;
56 import org.onap.aai.sparky.sync.entity.SelfLinkDescriptor;
57 import org.onap.aai.sparky.sync.enumeration.OperationState;
58 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
59 import org.onap.aai.sparky.sync.task.PerformActiveInventoryRetrieval;
60 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
61 import org.onap.aai.sparky.sync.task.PerformElasticSearchRetrieval;
62 import org.onap.aai.sparky.sync.task.PerformElasticSearchUpdate;
63 import org.onap.aai.sparky.util.NodeUtils;
64 import org.slf4j.MDC;
65
66 import com.fasterxml.jackson.core.JsonProcessingException;
67 import com.fasterxml.jackson.databind.JsonNode;
68 import com.fasterxml.jackson.databind.ObjectReader;
69 import com.fasterxml.jackson.databind.node.ArrayNode;
70
71 /**
72  * The Class AutosuggestionSynchronizer.
73  */
74 public class AggregationSynchronizer extends AbstractEntitySynchronizer
75     implements IndexSynchronizer {
76
77   /**
78    * The Class RetryAggregationEntitySyncContainer.
79    */
80   private class RetryAggregationEntitySyncContainer {
81     NetworkTransaction txn;
82     AggregationEntity ae;
83
84     /**
85      * Instantiates a new retry aggregation entity sync container.
86      *
87      * @param txn the txn
88      * @param ae the se
89      */
90     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
91       this.txn = txn;
92       this.ae = ae;
93     }
94
95     public NetworkTransaction getNetworkTransaction() {
96       return txn;
97     }
98
99     public AggregationEntity getAggregationEntity() {
100       return ae;
101     }
102   }
103   
104   private static final Logger LOG =
105       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
106   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
107
108   private boolean allWorkEnumerated;
109   private Deque<SelfLinkDescriptor> selflinks;
110   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
111   private Map<String, Integer> retryLimitTracker;
112   protected ExecutorService esPutExecutor;
113   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
114   private boolean syncInProgress;
115   private Map<String, String> contextMap;
116   private String entityType;
117   private ElasticSearchSchemaConfig schemaConfig;
118   private OxmEntityLookup oxmEntityLookup;
119   
120   /**
121    * Instantiates a new entity aggregation synchronizer.
122    *
123    * @param indexName the index name
124    * @throws Exception the exception
125    */
126   public AggregationSynchronizer(String entityType, ElasticSearchSchemaConfig schemaConfig,
127       int numSyncWorkers, int numActiveInventoryWorkers, int numElasticWorkers,
128       NetworkStatisticsConfig aaiStatConfig, NetworkStatisticsConfig esStatConfig,
129       OxmEntityLookup oxmEntityLookup) throws Exception {
130     
131     super(LOG, "AGGES-" + schemaConfig.getIndexName().toUpperCase(), numSyncWorkers,
132         numActiveInventoryWorkers, numElasticWorkers, schemaConfig.getIndexName(),aaiStatConfig, esStatConfig); 
133     
134     this.oxmEntityLookup = oxmEntityLookup;
135     
136     this.schemaConfig = schemaConfig;
137     this.entityType = entityType;
138     this.allWorkEnumerated = false;
139     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
140     this.synchronizerName = "Entity Aggregation Synchronizer";
141     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
142     this.syncInProgress = false;
143     this.allWorkEnumerated = false;
144     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
145     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
146     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
147     
148     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
149     
150     this.aaiEntityStats.intializeEntityCounters(entityType);
151     this.esEntityStats.intializeEntityCounters(entityType);
152         
153     this.contextMap = MDC.getCopyOfContextMap();
154   }
155
156   /**
157    * Collect all the work.
158    *
159    * @return the operation state
160    */
161   private OperationState collectAllTheWork() {
162     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
163     final String entity = this.getEntityType();
164     try {
165
166       aaiWorkOnHand.set(1);
167
168       supplyAsync(new Supplier<Void>() {
169
170         @Override
171         public Void get() {
172           MDC.setContextMap(contextMap);
173           OperationResult typeLinksResult = null;
174           try {
175             typeLinksResult = aaiAdapter.getSelfLinksByEntityType(entity);
176             aaiWorkOnHand.decrementAndGet();
177             processEntityTypeSelfLinks(typeLinksResult);
178           } catch (Exception exc) {
179             // TODO -> LOG, what should be logged here?
180             
181             exc.printStackTrace();
182           }
183
184           return null;
185         }
186
187       }, aaiExecutor).whenComplete((result, error) -> {
188
189         if (error != null) {
190           LOG.error(AaiUiMsgs.ERROR_GENERIC,
191               "An error occurred getting data from AAI. Error = " + error.getMessage());
192         }
193       });
194
195       while (aaiWorkOnHand.get() != 0) {
196
197         if (LOG.isDebugEnabled()) {
198           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
199         }
200
201         Thread.sleep(1000);
202       }
203
204       aaiWorkOnHand.set(selflinks.size());
205       allWorkEnumerated = true;
206       syncEntityTypes();
207
208       while (!isSyncDone()) {
209         performRetrySync();
210         Thread.sleep(1000);
211       }
212
213       /*
214        * Make sure we don't hang on to retries that failed which could cause issues during future
215        * syncs
216        */
217       retryLimitTracker.clear();
218
219     } catch (Exception exc) {
220       // TODO -> LOG, waht should be logged here?
221     }
222
223     return OperationState.OK;
224   }
225   
226   
227   /**
228    * Perform retry sync.
229    */
230   private void performRetrySync() {
231     while (retryQueue.peek() != null) {
232
233       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
234       if (rsc != null) {
235
236         AggregationEntity ae = rsc.getAggregationEntity();
237         NetworkTransaction txn = rsc.getNetworkTransaction();
238
239         String link = null;
240         try {
241           /*
242            * In this retry flow the se object has already derived its fields
243            */
244           link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
245         } catch (Exception exc) {
246           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
247         }
248
249         if (link != null) {
250           NetworkTransaction retryTransaction = new NetworkTransaction();
251           retryTransaction.setLink(link);
252           retryTransaction.setEntityType(txn.getEntityType());
253           retryTransaction.setDescriptor(txn.getDescriptor());
254           retryTransaction.setOperationType(HttpMethod.GET);
255
256           /*
257            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
258            * called incrementAndGet when queuing the failed PUT!
259            */
260
261           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, elasticSearchAdapter),
262               esExecutor).whenComplete((result, error) -> {
263
264                 esWorkOnHand.decrementAndGet();
265
266                 if (error != null) {
267                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
268                 } else {
269                   updateElasticSearchCounters(result);
270                   performDocumentUpsert(result, ae);
271                 }
272               });
273         }
274
275       }
276     }
277   }
278   
279   /**
280    * Perform document upsert.
281    *
282    * @param esGetTxn the es get txn
283    * @param ae the ae
284    */
285   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
286     /**
287      * <p>
288      * <ul>
289      * As part of the response processing we need to do the following:
290      * <li>1. Extract the version (if present), it will be the ETAG when we use the
291      * Search-Abstraction-Service
292      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
293      * tag
294      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
295      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
296      * </ul>
297      * </p>
298      */
299     String link = null;
300     try {
301       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId()); 
302     } catch (Exception exc) {
303       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
304       return;
305     }
306
307     String versionNumber = null;
308     boolean wasEntryDiscovered = false;
309     if (esGetTxn.getOperationResult().getResultCode() == 404) {
310       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
311     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
312       wasEntryDiscovered = true;
313       try {
314         versionNumber = NodeUtils.extractFieldValueFromObject(
315             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
316             "_version");
317       } catch (IOException exc) {
318         String message =
319             "Error extracting version number from response, aborting aggregation entity sync of "
320                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
321         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
322         return;
323       }
324     } else {
325       /*
326        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
327        * return.
328        */
329       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
330           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
331       return;
332     }
333
334     try {
335       String jsonPayload = null;
336       if (wasEntryDiscovered) {
337         try {
338           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
339           NodeUtils.extractObjectsByKey(
340               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
341               "_source", sourceObject);
342
343           if (!sourceObject.isEmpty()) {
344             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
345             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
346             ObjectReader updater = mapper.readerForUpdating(me);
347             MergableEntity merged = updater.readValue(ae.getAsJson());
348             jsonPayload = mapper.writeValueAsString(merged);
349           }
350         } catch (IOException exc) {
351           String message =
352               "Error extracting source value from response, aborting aggregation entity sync of "
353                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
354           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
355           return;
356         }
357       } else {
358         jsonPayload = ae.getAsJson();
359       }
360
361       if (wasEntryDiscovered) {
362         if (versionNumber != null && jsonPayload != null) {
363
364           String requestPayload =
365               elasticSearchAdapter.buildBulkImportOperationRequest(schemaConfig.getIndexName(),
366                   schemaConfig.getIndexDocType(), ae.getId(), versionNumber, jsonPayload);
367
368           NetworkTransaction transactionTracker = new NetworkTransaction();
369           transactionTracker.setEntityType(esGetTxn.getEntityType());
370           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
371           transactionTracker.setOperationType(HttpMethod.PUT);
372
373           esWorkOnHand.incrementAndGet();
374           supplyAsync(new PerformElasticSearchUpdate(elasticSearchAdapter.getBulkUrl(),
375               requestPayload, elasticSearchAdapter, transactionTracker), esPutExecutor)
376                   .whenComplete((result, error) -> {
377
378                     esWorkOnHand.decrementAndGet();
379
380                     if (error != null) {
381                       String message = "Aggregation entity sync UPDATE PUT error - "
382                           + error.getLocalizedMessage();
383                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
384                     } else {
385                       updateElasticSearchCounters(result);
386                       processStoreDocumentResult(result, esGetTxn, ae);
387                     }
388                   });
389         }
390
391       } else {
392         if (link != null && jsonPayload != null) {
393
394           NetworkTransaction updateElasticTxn = new NetworkTransaction();
395           updateElasticTxn.setLink(link);
396           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
397           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
398           updateElasticTxn.setOperationType(HttpMethod.PUT);
399
400           esWorkOnHand.incrementAndGet();
401           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, elasticSearchAdapter),
402               esPutExecutor).whenComplete((result, error) -> {
403
404                 esWorkOnHand.decrementAndGet();
405
406                 if (error != null) {
407                   String message =
408                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
409                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
410                 } else {
411                   updateElasticSearchCounters(result);
412                   processStoreDocumentResult(result, esGetTxn, ae);
413                 }
414               });
415         }
416       }
417     } catch (Exception exc) {
418       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
419           + exc.getLocalizedMessage();
420       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
421     }
422   }
423   
424   /**
425    * Should allow retry.
426    *
427    * @param id the id
428    * @return true, if successful
429    */
430   private boolean shouldAllowRetry(String id) {
431     boolean isRetryAllowed = true;
432     if (retryLimitTracker.get(id) != null) {
433       Integer currentCount = retryLimitTracker.get(id);
434       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
435         isRetryAllowed = false;
436         String message = "Aggregation entity re-sync limit reached for " + id
437             + ", re-sync will no longer be attempted for this entity";
438         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
439       } else {
440         Integer newCount = new Integer(currentCount.intValue() + 1);
441         retryLimitTracker.put(id, newCount);
442       }
443     } else {
444       Integer firstRetryCount = new Integer(1);
445       retryLimitTracker.put(id, firstRetryCount);
446     }
447
448     return isRetryAllowed;
449   }
450   
451   /**
452    * Process store document result.
453    *
454    * @param esPutResult the es put result
455    * @param esGetResult the es get result
456    * @param ae the ae
457    */
458   private void processStoreDocumentResult(NetworkTransaction esPutResult,
459       NetworkTransaction esGetResult, AggregationEntity ae) {
460
461     OperationResult or = esPutResult.getOperationResult();
462
463     if (!or.wasSuccessful()) {
464       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
465
466         if (shouldAllowRetry(ae.getId())) {
467           esWorkOnHand.incrementAndGet();
468
469           RetryAggregationEntitySyncContainer rsc =
470               new RetryAggregationEntitySyncContainer(esGetResult, ae);
471           retryQueue.push(rsc);
472
473           String message = "Store document failed during aggregation entity synchronization"
474               + " due to version conflict. Entity will be re-synced.";
475           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
476         }
477       } else {
478         String message =
479             "Store document failed during aggregation entity synchronization with result code "
480                 + or.getResultCode() + " and result message " + or.getResult();
481         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
482       }
483     }
484   }
485   
486   /**
487    * Sync entity types.
488    */
489   private void syncEntityTypes() {
490
491     while (selflinks.peek() != null) {
492
493       SelfLinkDescriptor linkDescriptor = selflinks.poll();
494       aaiWorkOnHand.decrementAndGet();
495
496       OxmEntityDescriptor descriptor = null;
497
498       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
499
500         descriptor = oxmEntityLookup.getEntityDescriptors().get(linkDescriptor.getEntityType());
501
502         if (descriptor == null) {
503           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
504           // go to next element in iterator
505           continue;
506         }
507
508         NetworkTransaction txn = new NetworkTransaction();
509         txn.setDescriptor(descriptor);
510         txn.setLink(linkDescriptor.getSelfLink());
511         txn.setOperationType(HttpMethod.GET);
512         txn.setEntityType(linkDescriptor.getEntityType());
513
514         aaiWorkOnHand.incrementAndGet();
515
516         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiAdapter), aaiExecutor)
517             .whenComplete((result, error) -> {
518
519               aaiWorkOnHand.decrementAndGet();
520
521               if (error != null) {
522                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
523               } else {
524                 if (result == null) {
525                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
526                       linkDescriptor.getSelfLink());
527                 } else {
528                   updateActiveInventoryCounters(result);
529                   fetchDocumentForUpsert(result);
530                 }
531               }
532             });
533       }
534
535     }
536
537   }
538   
539   /**
540    * Fetch document for upsert.
541    *
542    * @param txn the txn
543    */
544   private void fetchDocumentForUpsert(NetworkTransaction txn) {
545     // modified
546     if (!txn.getOperationResult().wasSuccessful()) {
547       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
548       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
549       return;
550     }
551
552     try {
553         final String jsonResult = txn.getOperationResult().getResult();
554         if (jsonResult != null && jsonResult.length() > 0) {
555
556           AggregationEntity ae = new AggregationEntity();
557           ae.setLink(ActiveInventoryAdapter.extractResourcePath(txn.getLink()));
558           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
559           ae.deriveFields();
560
561           String link = null;
562           try {
563             link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), ae.getId());
564           } catch (Exception exc) {
565             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
566           }
567
568           if (link != null) {
569             NetworkTransaction n2 = new NetworkTransaction();
570             n2.setLink(link);
571             n2.setEntityType(txn.getEntityType());
572             n2.setDescriptor(txn.getDescriptor());
573             n2.setOperationType(HttpMethod.GET);
574
575             esWorkOnHand.incrementAndGet();
576
577             supplyAsync(new PerformElasticSearchRetrieval(n2, elasticSearchAdapter), esExecutor)
578                 .whenComplete((result, error) -> {
579
580                   esWorkOnHand.decrementAndGet();
581
582                   if (error != null) {
583                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
584                   } else {
585                     updateElasticSearchCounters(result);
586                     performDocumentUpsert(result, ae);
587                   }
588                 });
589           }
590         }
591
592      } catch (JsonProcessingException exc) {
593       // TODO -> LOG, waht should be logged here?
594     } catch (IOException exc) {
595       // TODO -> LOG, waht should be logged here?
596     }
597   }
598   
599   
600   /**
601    * Populate aggregation entity document.
602    *
603    * @param doc the doc
604    * @param result the result
605    * @param resultDescriptor the result descriptor
606    * @throws JsonProcessingException the json processing exception
607    * @throws IOException Signals that an I/O exception has occurred.
608    */
609   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
610       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
611     doc.setEntityType(resultDescriptor.getEntityName());
612     JsonNode entityNode = mapper.readTree(result);
613     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
614     doc.copyAttributeKeyValuePair(map);
615   }
616   
617   /**
618    * Process entity type self links.
619    *
620    * @param operationResult the operation result
621    */
622   private void processEntityTypeSelfLinks(OperationResult operationResult) {
623
624     JsonNode rootNode = null;
625
626     final String jsonResult = operationResult.getResult();
627
628     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
629
630       try {
631         rootNode = mapper.readTree(jsonResult);
632       } catch (IOException exc) {
633         String message =
634             "Could not deserialize JSON (representing operation result) as node tree. " +
635             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
636         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
637       }
638
639       JsonNode resultData = rootNode.get("result-data");
640       ArrayNode resultDataArrayNode = null;
641
642       if (resultData.isArray()) {
643         resultDataArrayNode = (ArrayNode) resultData;
644
645         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
646         JsonNode element = null;
647
648         while (elementIterator.hasNext()) {
649           element = elementIterator.next();
650
651           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
652           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
653
654           OxmEntityDescriptor descriptor = null;
655
656           if (resourceType != null && resourceLink != null) {
657
658             descriptor = oxmEntityLookup.getEntityDescriptors().get(resourceType);
659
660             if (descriptor == null) {
661               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
662               // go to next element in iterator
663               continue;
664             }
665
666             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConstants.NODES_ONLY_MODIFIER, resourceType));
667             
668
669           }
670         }
671       }
672     }
673
674   }
675
676   /*
677    * (non-Javadoc)
678    * 
679    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
680    */
681   @Override
682   public OperationState doSync() {
683     this.syncDurationInMs = -1;
684     syncStartedTimeStampInMs = System.currentTimeMillis();
685     String txnID = NodeUtils.getRandomTxnId();
686     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
687     
688     return collectAllTheWork();
689   }
690
691   @Override
692   public SynchronizerState getState() {
693
694     if (!isSyncDone()) {
695       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
696     }
697
698     return SynchronizerState.IDLE;
699
700   }
701
702   /*
703    * (non-Javadoc)
704    * 
705    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
706    */
707   @Override
708   public String getStatReport(boolean showFinalReport) {
709     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
710     return getStatReport(syncDurationInMs, showFinalReport);
711   }
712
713   public String getEntityType() {
714     return entityType;
715   }
716
717   public void setEntityType(String entityType) {
718     this.entityType = entityType;
719   }
720
721   /*
722    * (non-Javadoc)
723    * 
724    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
725    */
726   @Override
727   public void shutdown() {
728     this.shutdownExecutors();
729   }
730
731   @Override
732   protected boolean isSyncDone() {
733
734     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
735
736     if (LOG.isDebugEnabled()) {
737       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
738           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
739     }
740
741     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
742       return false;
743     }
744
745     this.syncInProgress = false;
746
747     return true;
748   }
749
750   /*
751    * (non-Javadoc)
752    * 
753    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
754    */
755   @Override
756   public void clearCache() {
757
758     if (syncInProgress) {
759       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
760           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
761       return;
762     }
763
764     super.clearCache();
765     this.resetCounters();
766     if (entityCounters != null) {
767       entityCounters.clear();
768     }
769
770     allWorkEnumerated = false;
771
772   }
773
774 }