Initial commit for AAI-UI(sparky-backend)
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AggregationSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.sql.Timestamp;
34 import java.text.SimpleDateFormat;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.Deque;
38 import java.util.EnumSet;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentLinkedDeque;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.function.Supplier;
50
51 import javax.json.Json;
52
53 import org.openecomp.cl.api.Logger;
54 import org.openecomp.cl.eelf.LoggerFactory;
55 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
56 import org.openecomp.sparky.dal.NetworkTransaction;
57 import org.openecomp.sparky.dal.elasticsearch.config.ElasticSearchConfig;
58 import org.openecomp.sparky.dal.rest.HttpMethod;
59 import org.openecomp.sparky.dal.rest.OperationResult;
60 import org.openecomp.sparky.logging.AaiUiMsgs;
61 import org.openecomp.sparky.synchronizer.config.SynchronizerConfiguration;
62 import org.openecomp.sparky.synchronizer.entity.AggregationEntity;
63 import org.openecomp.sparky.synchronizer.entity.MergableEntity;
64 import org.openecomp.sparky.synchronizer.entity.SelfLinkDescriptor;
65 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
66 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
67 import org.openecomp.sparky.synchronizer.task.PerformActiveInventoryRetrieval;
68 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
69 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchRetrieval;
70 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchUpdate;
71 import org.openecomp.sparky.util.NodeUtils;
72 import org.slf4j.MDC;
73
74 import org.openecomp.cl.mdc.MdcContext;
75 import com.fasterxml.jackson.core.JsonProcessingException;
76 import com.fasterxml.jackson.databind.JsonNode;
77 import com.fasterxml.jackson.databind.ObjectReader;
78 import com.fasterxml.jackson.databind.node.ArrayNode;
79
80 /**
81  * The Class AutosuggestionSynchronizer.
82  */
83 public class AggregationSynchronizer extends AbstractEntitySynchronizer
84     implements IndexSynchronizer {
85
86   /**
87    * The Class RetryAggregationEntitySyncContainer.
88    */
89   private class RetryAggregationEntitySyncContainer {
90     NetworkTransaction txn;
91     AggregationEntity ae;
92
93     /**
94      * Instantiates a new retry aggregation entity sync container.
95      *
96      * @param txn the txn
97      * @param ae the se
98      */
99     public RetryAggregationEntitySyncContainer(NetworkTransaction txn, AggregationEntity ae) {
100       this.txn = txn;
101       this.ae = ae;
102     }
103
104     public NetworkTransaction getNetworkTransaction() {
105       return txn;
106     }
107
108     public AggregationEntity getAggregationEntity() {
109       return ae;
110     }
111   }
112   
113   private static final Logger LOG =
114       LoggerFactory.getInstance().getLogger(AggregationSynchronizer.class);
115   private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
116
117   private boolean allWorkEnumerated;
118   private Deque<SelfLinkDescriptor> selflinks;
119   private Deque<RetryAggregationEntitySyncContainer> retryQueue;
120   private Map<String, Integer> retryLimitTracker;
121   protected ExecutorService esPutExecutor;
122   private ConcurrentHashMap<String, AtomicInteger> entityCounters;
123   private boolean syncInProgress;
124   private Map<String, String> contextMap;
125   private String entityType;
126
127   /**
128    * Instantiates a new entity aggregation synchronizer.
129    *
130    * @param indexName the index name
131    * @throws Exception the exception
132    */
133   public AggregationSynchronizer(String entityType, String indexName) throws Exception {
134     super(LOG, "AGGES-" + indexName.toUpperCase(), 2, 5, 5, indexName); // multiple Autosuggestion
135                                                                        // Entity Synchronizer will
136                                                                        // run for different indices
137     
138     this.entityType = entityType;
139     this.allWorkEnumerated = false;
140     this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
141     this.synchronizerName = "Entity Aggregation Synchronizer";
142     this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
143     this.syncInProgress = false;
144     this.allWorkEnumerated = false;
145     this.selflinks = new ConcurrentLinkedDeque<SelfLinkDescriptor>();
146     this.retryQueue = new ConcurrentLinkedDeque<RetryAggregationEntitySyncContainer>();
147     this.retryLimitTracker = new ConcurrentHashMap<String, Integer>();
148     
149     this.esPutExecutor = NodeUtils.createNamedExecutor("AGGES-ES-PUT", 1, LOG);
150     Map<String, OxmEntityDescriptor> descriptor = new HashMap<String, OxmEntityDescriptor>();
151     descriptor.put(entityType, oxmModelLoader.getEntityDescriptors().get(entityType));
152     this.aaiEntityStats.initializeCountersFromOxmEntityDescriptors(
153         descriptor);
154     this.esEntityStats.initializeCountersFromOxmEntityDescriptors(
155         descriptor);
156     this.contextMap = MDC.getCopyOfContextMap();
157   }
158
159   /**
160    * Collect all the work.
161    *
162    * @return the operation state
163    */
164   private OperationState collectAllTheWork() {
165     final Map<String, String> contextMap = MDC.getCopyOfContextMap();
166     final String entity = this.getEntityType();
167     try {
168
169       aaiWorkOnHand.set(1);
170
171       supplyAsync(new Supplier<Void>() {
172
173         @Override
174         public Void get() {
175           MDC.setContextMap(contextMap);
176           OperationResult typeLinksResult = null;
177           try {
178             typeLinksResult = aaiDataProvider.getSelfLinksByEntityType(entity);
179             aaiWorkOnHand.decrementAndGet();
180             processEntityTypeSelfLinks(typeLinksResult);
181           } catch (Exception exc) {
182             // TODO -> LOG, what should be logged here?
183           }
184
185           return null;
186         }
187
188       }, aaiExecutor).whenComplete((result, error) -> {
189
190         if (error != null) {
191           LOG.error(AaiUiMsgs.ERROR_GENERIC,
192               "An error occurred getting data from AAI. Error = " + error.getMessage());
193         }
194       });
195
196       while (aaiWorkOnHand.get() != 0) {
197
198         if (LOG.isDebugEnabled()) {
199           LOG.debug(AaiUiMsgs.WAIT_FOR_ALL_SELFLINKS_TO_BE_COLLECTED);
200         }
201
202         Thread.sleep(1000);
203       }
204
205       aaiWorkOnHand.set(selflinks.size());
206       allWorkEnumerated = true;
207       syncEntityTypes();
208
209       while (!isSyncDone()) {
210         performRetrySync();
211         Thread.sleep(1000);
212       }
213
214       /*
215        * Make sure we don't hang on to retries that failed which could cause issues during future
216        * syncs
217        */
218       retryLimitTracker.clear();
219
220     } catch (Exception exc) {
221       // TODO -> LOG, waht should be logged here?
222     }
223
224     return OperationState.OK;
225   }
226   
227   
228   /**
229    * Perform retry sync.
230    */
231   private void performRetrySync() {
232     while (retryQueue.peek() != null) {
233
234       RetryAggregationEntitySyncContainer rsc = retryQueue.poll();
235       if (rsc != null) {
236
237         AggregationEntity ae = rsc.getAggregationEntity();
238         NetworkTransaction txn = rsc.getNetworkTransaction();
239
240         String link = null;
241         try {
242           /*
243            * In this retry flow the se object has already derived its fields
244            */
245           link = getElasticFullUrl("/" + ae.getId(), getIndexName());
246         } catch (Exception exc) {
247           LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_URI, exc.getLocalizedMessage());
248         }
249
250         if (link != null) {
251           NetworkTransaction retryTransaction = new NetworkTransaction();
252           retryTransaction.setLink(link);
253           retryTransaction.setEntityType(txn.getEntityType());
254           retryTransaction.setDescriptor(txn.getDescriptor());
255           retryTransaction.setOperationType(HttpMethod.GET);
256
257           /*
258            * IMPORTANT - DO NOT incrementAndGet the esWorkOnHand as this is a retry flow! We already
259            * called incrementAndGet when queuing the failed PUT!
260            */
261
262           supplyAsync(new PerformElasticSearchRetrieval(retryTransaction, esDataProvider),
263               esExecutor).whenComplete((result, error) -> {
264
265                 esWorkOnHand.decrementAndGet();
266
267                 if (error != null) {
268                   LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED_RESYNC, error.getLocalizedMessage());
269                 } else {
270                   updateElasticSearchCounters(result);
271                   performDocumentUpsert(result, ae);
272                 }
273               });
274         }
275
276       }
277     }
278   }
279   
280   /**
281    * Perform document upsert.
282    *
283    * @param esGetTxn the es get txn
284    * @param ae the ae
285    */
286   protected void performDocumentUpsert(NetworkTransaction esGetTxn, AggregationEntity ae) {
287     /**
288      * <p>
289      * <ul>
290      * As part of the response processing we need to do the following:
291      * <li>1. Extract the version (if present), it will be the ETAG when we use the
292      * Search-Abstraction-Service
293      * <li>2. Spawn next task which is to do the PUT operation into elastic with or with the version
294      * tag
295      * <li>a) if version is null or RC=404, then standard put, no _update with version tag
296      * <li>b) if version != null, do PUT with _update?version= versionNumber in the URI to elastic
297      * </ul>
298      * </p>
299      */
300     String link = null;
301     try {
302       link = getElasticFullUrl("/" + ae.getId(), getIndexName());
303     } catch (Exception exc) {
304       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
305       return;
306     }
307
308     String versionNumber = null;
309     boolean wasEntryDiscovered = false;
310     if (esGetTxn.getOperationResult().getResultCode() == 404) {
311       LOG.info(AaiUiMsgs.ES_SIMPLE_PUT, ae.getEntityPrimaryKeyValue());
312     } else if (esGetTxn.getOperationResult().getResultCode() == 200) {
313       wasEntryDiscovered = true;
314       try {
315         versionNumber = NodeUtils.extractFieldValueFromObject(
316             NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
317             "_version");
318       } catch (IOException exc) {
319         String message =
320             "Error extracting version number from response, aborting aggregation entity sync of "
321                 + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
322         LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
323         return;
324       }
325     } else {
326       /*
327        * Not being a 200 does not mean a failure. eg 201 is returned for created. TODO -> Should we
328        * return.
329        */
330       LOG.error(AaiUiMsgs.ES_OPERATION_RETURN_CODE,
331           String.valueOf(esGetTxn.getOperationResult().getResultCode()));
332       return;
333     }
334
335     try {
336       String jsonPayload = null;
337       if (wasEntryDiscovered) {
338         try {
339           ArrayList<JsonNode> sourceObject = new ArrayList<JsonNode>();
340           NodeUtils.extractObjectsByKey(
341               NodeUtils.convertJsonStrToJsonNode(esGetTxn.getOperationResult().getResult()),
342               "_source", sourceObject);
343
344           if (!sourceObject.isEmpty()) {
345             String responseSource = NodeUtils.convertObjectToJson(sourceObject.get(0), false);
346             MergableEntity me = mapper.readValue(responseSource, MergableEntity.class);
347             ObjectReader updater = mapper.readerForUpdating(me);
348             MergableEntity merged = updater.readValue(ae.getIndexDocumentJson());
349             jsonPayload = mapper.writeValueAsString(merged);
350           }
351         } catch (IOException exc) {
352           String message =
353               "Error extracting source value from response, aborting aggregation entity sync of "
354                   + ae.getEntityPrimaryKeyValue() + ". Error - " + exc.getLocalizedMessage();
355           LOG.error(AaiUiMsgs.ERROR_EXTRACTING_FROM_RESPONSE, message);
356           return;
357         }
358       } else {
359         jsonPayload = ae.getIndexDocumentJson();
360       }
361
362       if (wasEntryDiscovered) {
363         if (versionNumber != null && jsonPayload != null) {
364
365           String requestPayload = esDataProvider.buildBulkImportOperationRequest(getIndexName(),
366               ElasticSearchConfig.getConfig().getType(), ae.getId(), versionNumber, jsonPayload);
367
368           NetworkTransaction transactionTracker = new NetworkTransaction();
369           transactionTracker.setEntityType(esGetTxn.getEntityType());
370           transactionTracker.setDescriptor(esGetTxn.getDescriptor());
371           transactionTracker.setOperationType(HttpMethod.PUT);
372
373           esWorkOnHand.incrementAndGet();
374           supplyAsync(new PerformElasticSearchUpdate(ElasticSearchConfig.getConfig().getBulkUrl(),
375               requestPayload, esDataProvider, transactionTracker), esPutExecutor)
376                   .whenComplete((result, error) -> {
377
378                     esWorkOnHand.decrementAndGet();
379
380                     if (error != null) {
381                       String message = "Aggregation entity sync UPDATE PUT error - "
382                           + error.getLocalizedMessage();
383                       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
384                     } else {
385                       updateElasticSearchCounters(result);
386                       processStoreDocumentResult(result, esGetTxn, ae);
387                     }
388                   });
389         }
390
391       } else {
392         if (link != null && jsonPayload != null) {
393
394           NetworkTransaction updateElasticTxn = new NetworkTransaction();
395           updateElasticTxn.setLink(link);
396           updateElasticTxn.setEntityType(esGetTxn.getEntityType());
397           updateElasticTxn.setDescriptor(esGetTxn.getDescriptor());
398           updateElasticTxn.setOperationType(HttpMethod.PUT);
399
400           esWorkOnHand.incrementAndGet();
401           supplyAsync(new PerformElasticSearchPut(jsonPayload, updateElasticTxn, esDataProvider),
402               esPutExecutor).whenComplete((result, error) -> {
403
404                 esWorkOnHand.decrementAndGet();
405
406                 if (error != null) {
407                   String message =
408                       "Aggregation entity sync UPDATE PUT error - " + error.getLocalizedMessage();
409                   LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
410                 } else {
411                   updateElasticSearchCounters(result);
412                   processStoreDocumentResult(result, esGetTxn, ae);
413                 }
414               });
415         }
416       }
417     } catch (Exception exc) {
418       String message = "Exception caught during aggregation entity sync PUT operation. Message - "
419           + exc.getLocalizedMessage();
420       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
421     }
422   }
423   
424   /**
425    * Should allow retry.
426    *
427    * @param id the id
428    * @return true, if successful
429    */
430   private boolean shouldAllowRetry(String id) {
431     boolean isRetryAllowed = true;
432     if (retryLimitTracker.get(id) != null) {
433       Integer currentCount = retryLimitTracker.get(id);
434       if (currentCount.intValue() >= RETRY_COUNT_PER_ENTITY_LIMIT.intValue()) {
435         isRetryAllowed = false;
436         String message = "Aggregation entity re-sync limit reached for " + id
437             + ", re-sync will no longer be attempted for this entity";
438         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
439       } else {
440         Integer newCount = new Integer(currentCount.intValue() + 1);
441         retryLimitTracker.put(id, newCount);
442       }
443     } else {
444       Integer firstRetryCount = new Integer(1);
445       retryLimitTracker.put(id, firstRetryCount);
446     }
447
448     return isRetryAllowed;
449   }
450   
451   /**
452    * Process store document result.
453    *
454    * @param esPutResult the es put result
455    * @param esGetResult the es get result
456    * @param ae the ae
457    */
458   private void processStoreDocumentResult(NetworkTransaction esPutResult,
459       NetworkTransaction esGetResult, AggregationEntity ae) {
460
461     OperationResult or = esPutResult.getOperationResult();
462
463     if (!or.wasSuccessful()) {
464       if (or.getResultCode() == VERSION_CONFLICT_EXCEPTION_CODE) {
465
466         if (shouldAllowRetry(ae.getId())) {
467           esWorkOnHand.incrementAndGet();
468
469           RetryAggregationEntitySyncContainer rsc =
470               new RetryAggregationEntitySyncContainer(esGetResult, ae);
471           retryQueue.push(rsc);
472
473           String message = "Store document failed during aggregation entity synchronization"
474               + " due to version conflict. Entity will be re-synced.";
475           LOG.warn(AaiUiMsgs.ERROR_GENERIC, message);
476         }
477       } else {
478         String message =
479             "Store document failed during aggregation entity synchronization with result code "
480                 + or.getResultCode() + " and result message " + or.getResult();
481         LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
482       }
483     }
484   }
485   
486   /**
487    * Sync entity types.
488    */
489   private void syncEntityTypes() {
490
491     while (selflinks.peek() != null) {
492
493       SelfLinkDescriptor linkDescriptor = selflinks.poll();
494       aaiWorkOnHand.decrementAndGet();
495
496       OxmEntityDescriptor descriptor = null;
497
498       if (linkDescriptor.getSelfLink() != null && linkDescriptor.getEntityType() != null) {
499
500         descriptor = oxmModelLoader.getEntityDescriptor(linkDescriptor.getEntityType());
501
502         if (descriptor == null) {
503           LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, linkDescriptor.getEntityType());
504           // go to next element in iterator
505           continue;
506         }
507
508         NetworkTransaction txn = new NetworkTransaction();
509         txn.setDescriptor(descriptor);
510         txn.setLink(linkDescriptor.getSelfLink());
511         txn.setOperationType(HttpMethod.GET);
512         txn.setEntityType(linkDescriptor.getEntityType());
513
514         aaiWorkOnHand.incrementAndGet();
515
516         supplyAsync(new PerformActiveInventoryRetrieval(txn, aaiDataProvider), aaiExecutor)
517             .whenComplete((result, error) -> {
518
519               aaiWorkOnHand.decrementAndGet();
520
521               if (error != null) {
522                 LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_GENERIC, error.getLocalizedMessage());
523               } else {
524                 if (result == null) {
525                   LOG.error(AaiUiMsgs.AAI_RETRIEVAL_FAILED_FOR_SELF_LINK,
526                       linkDescriptor.getSelfLink());
527                 } else {
528                   updateActiveInventoryCounters(result);
529                   fetchDocumentForUpsert(result);
530                 }
531               }
532             });
533       }
534
535     }
536
537   }
538   
539   /**
540    * Fetch document for upsert.
541    *
542    * @param txn the txn
543    */
544   private void fetchDocumentForUpsert(NetworkTransaction txn) {
545     // modified
546     if (!txn.getOperationResult().wasSuccessful()) {
547       String message = "Self link failure. Result - " + txn.getOperationResult().getResult();
548       LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
549       return;
550     }
551
552     try {
553         final String jsonResult = txn.getOperationResult().getResult();
554         if (jsonResult != null && jsonResult.length() > 0) {
555
556           AggregationEntity ae = new AggregationEntity(oxmModelLoader);
557           ae.setLink( txn.getLink() );
558           populateAggregationEntityDocument(ae, jsonResult, txn.getDescriptor());
559           ae.deriveFields();
560
561           String link = null;
562           try {
563             link = getElasticFullUrl("/" + ae.getId(), getIndexName());
564           } catch (Exception exc) {
565             LOG.error(AaiUiMsgs.ES_FAILED_TO_CONSTRUCT_QUERY, exc.getLocalizedMessage());
566           }
567
568           if (link != null) {
569             NetworkTransaction n2 = new NetworkTransaction();
570             n2.setLink(link);
571             n2.setEntityType(txn.getEntityType());
572             n2.setDescriptor(txn.getDescriptor());
573             n2.setOperationType(HttpMethod.GET);
574
575             esWorkOnHand.incrementAndGet();
576
577             supplyAsync(new PerformElasticSearchRetrieval(n2, esDataProvider), esExecutor)
578                 .whenComplete((result, error) -> {
579
580                   esWorkOnHand.decrementAndGet();
581
582                   if (error != null) {
583                     LOG.error(AaiUiMsgs.ES_RETRIEVAL_FAILED, error.getLocalizedMessage());
584                   } else {
585                     updateElasticSearchCounters(result);
586                     performDocumentUpsert(result, ae);
587                   }
588                 });
589           }
590         }
591
592      } catch (JsonProcessingException exc) {
593       // TODO -> LOG, waht should be logged here?
594     } catch (IOException exc) {
595       // TODO -> LOG, waht should be logged here?
596     }
597   }
598   
599   
600   /**
601    * Populate aggregation entity document.
602    *
603    * @param doc the doc
604    * @param result the result
605    * @param resultDescriptor the result descriptor
606    * @throws JsonProcessingException the json processing exception
607    * @throws IOException Signals that an I/O exception has occurred.
608    */
609   protected void populateAggregationEntityDocument(AggregationEntity doc, String result,
610       OxmEntityDescriptor resultDescriptor) throws JsonProcessingException, IOException {
611     doc.setEntityType(resultDescriptor.getEntityName());
612     JsonNode entityNode = mapper.readTree(result);
613     Map<String, Object> map = mapper.convertValue(entityNode, Map.class);
614     doc.copyAttributeKeyValuePair(map);
615   }
616   
617   /**
618    * Process entity type self links.
619    *
620    * @param operationResult the operation result
621    */
622   private void processEntityTypeSelfLinks(OperationResult operationResult) {
623
624     JsonNode rootNode = null;
625
626     final String jsonResult = operationResult.getResult();
627
628     if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
629
630       try {
631         rootNode = mapper.readTree(jsonResult);
632       } catch (IOException exc) {
633         String message =
634             "Could not deserialize JSON (representing operation result) as node tree. " +
635             "Operation result = " + jsonResult + ". " + exc.getLocalizedMessage();
636         LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, message);
637       }
638
639       JsonNode resultData = rootNode.get("result-data");
640       ArrayNode resultDataArrayNode = null;
641
642       if (resultData.isArray()) {
643         resultDataArrayNode = (ArrayNode) resultData;
644
645         Iterator<JsonNode> elementIterator = resultDataArrayNode.elements();
646         JsonNode element = null;
647
648         while (elementIterator.hasNext()) {
649           element = elementIterator.next();
650
651           final String resourceType = NodeUtils.getNodeFieldAsText(element, "resource-type");
652           final String resourceLink = NodeUtils.getNodeFieldAsText(element, "resource-link");
653
654           OxmEntityDescriptor descriptor = null;
655
656           if (resourceType != null && resourceLink != null) {
657
658             descriptor = oxmModelLoader.getEntityDescriptor(resourceType);
659
660             if (descriptor == null) {
661               LOG.error(AaiUiMsgs.MISSING_ENTITY_DESCRIPTOR, resourceType);
662               // go to next element in iterator
663               continue;
664             }
665
666             selflinks.add(new SelfLinkDescriptor(resourceLink, SynchronizerConfiguration.NODES_ONLY_MODIFIER, resourceType));
667             
668
669           }
670         }
671       }
672     }
673
674   }
675
676   /*
677    * (non-Javadoc)
678    * 
679    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
680    */
681   @Override
682   public OperationState doSync() {
683     String txnID = NodeUtils.getRandomTxnId();
684     MdcContext.initialize(txnID, "AggregationSynchronizer", "", "Sync", "");
685     
686     return collectAllTheWork();
687   }
688
689   @Override
690   public SynchronizerState getState() {
691
692     if (!isSyncDone()) {
693       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
694     }
695
696     return SynchronizerState.IDLE;
697
698   }
699
700   /*
701    * (non-Javadoc)
702    * 
703    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
704    */
705   @Override
706   public String getStatReport(boolean showFinalReport) {
707     return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs,
708         showFinalReport);
709   }
710
711   public String getEntityType() {
712     return entityType;
713   }
714
715   public void setEntityType(String entityType) {
716     this.entityType = entityType;
717   }
718
719   /*
720    * (non-Javadoc)
721    * 
722    * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
723    */
724   @Override
725   public void shutdown() {
726     this.shutdownExecutors();
727   }
728
729   @Override
730   protected boolean isSyncDone() {
731
732     int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
733
734     if (LOG.isDebugEnabled()) {
735       LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = "
736           + totalWorkOnHand + " all work enumerated = " + allWorkEnumerated);
737     }
738
739     if (totalWorkOnHand > 0 || !allWorkEnumerated) {
740       return false;
741     }
742
743     this.syncInProgress = false;
744
745     return true;
746   }
747
748   /*
749    * (non-Javadoc)
750    * 
751    * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
752    */
753   @Override
754   public void clearCache() {
755
756     if (syncInProgress) {
757       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
758           "Autosuggestion Entity Summarizer in progress, request to clear cache ignored");
759       return;
760     }
761
762     super.clearCache();
763     this.resetCounters();
764     if (entityCounters != null) {
765       entityCounters.clear();
766     }
767
768     allWorkEnumerated = false;
769
770   }
771
772 }